View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import org.apache.commons.logging.Log;
22  import org.apache.commons.logging.LogFactory;
23  import org.apache.hadoop.classification.InterfaceAudience;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.*;
26  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
27  import org.apache.hadoop.hbase.regionserver.wal.HLog;
28  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
29  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
30  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.HasThread;
33  
34  import java.io.IOException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.locks.ReentrantLock;
37  
38  /**
39   * Runs periodically to determine if the HLog should be rolled.
40   *
41   * NOTE: This class extends Thread rather than Chore because the sleep time
42   * can be interrupted when there is something to do, rather than the Chore
43   * sleep time which is invariant.
44   */
45  @InterfaceAudience.Private
46  class LogRoller extends HasThread implements WALActionsListener {
47    static final Log LOG = LogFactory.getLog(LogRoller.class);
48    private final ReentrantLock rollLock = new ReentrantLock();
49    private final AtomicBoolean rollLog = new AtomicBoolean(false);
50    private final Server server;
51    protected final RegionServerServices services;
52    private volatile long lastrolltime = System.currentTimeMillis();
53    // Period to roll log.
54    private final long rollperiod;
55    private final int threadWakeFrequency;
56  
57    /** @param server */
58    public LogRoller(final Server server, final RegionServerServices services) {
59      super();
60      this.server = server;
61      this.services = services;
62      this.rollperiod = this.server.getConfiguration().
63        getLong("hbase.regionserver.logroll.period", 3600000);
64      this.threadWakeFrequency = this.server.getConfiguration().
65        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
66    }
67  
68    @Override
69    public void run() {
70      while (!server.isStopped()) {
71        long now = System.currentTimeMillis();
72        boolean periodic = false;
73        if (!rollLog.get()) {
74          periodic = (now - this.lastrolltime) > this.rollperiod;
75          if (!periodic) {
76            synchronized (rollLog) {
77              try {
78                rollLog.wait(this.threadWakeFrequency);
79              } catch (InterruptedException e) {
80                // Fall through
81              }
82            }
83            continue;
84          }
85          // Time for periodic roll
86          if (LOG.isDebugEnabled()) {
87            LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
88          }
89        } else if (LOG.isDebugEnabled()) {
90          LOG.debug("HLog roll requested");
91        }
92        rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
93        try {
94          this.lastrolltime = now;
95          // Force the roll if the logroll.period is elapsed or if a roll was requested.
96          // The returned value is an array of actual region names.
97          byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
98          if (regionsToFlush != null) {
99            for (byte [] r: regionsToFlush) scheduleFlush(r);
100         }
101       } catch (FailedLogCloseException e) {
102         server.abort("Failed log close in log roller", e);
103       } catch (java.net.ConnectException e) {
104         server.abort("Failed log close in log roller", e);
105       } catch (IOException ex) {
106         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
107         server.abort("IOE in log roller",
108           RemoteExceptionHandler.checkIOException(ex));
109       } catch (Exception ex) {
110         LOG.error("Log rolling failed", ex);
111         server.abort("Log rolling failed", ex);
112       } finally {
113         try {
114           rollLog.set(false);
115         } finally {
116           rollLock.unlock();
117         }
118       }
119     }
120     LOG.info("LogRoller exiting.");
121   }
122 
123   /**
124    * @param encodedRegionName Encoded name of region to flush.
125    */
126   private void scheduleFlush(final byte [] encodedRegionName) {
127     boolean scheduled = false;
128     HRegion r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
129     FlushRequester requester = null;
130     if (r != null) {
131       requester = this.services.getFlushRequester();
132       if (requester != null) {
133         requester.requestFlush(r);
134         scheduled = true;
135       }
136     }
137     if (!scheduled) {
138       LOG.warn("Failed to schedule flush of " +
139         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
140         requester);
141     }
142   }
143 
144   public void logRollRequested() {
145     synchronized (rollLog) {
146       rollLog.set(true);
147       rollLog.notifyAll();
148     }
149   }
150 
151   /**
152    * Called by region server to wake up this thread if it sleeping.
153    * It is sleeping if rollLock is not held.
154    */
155   public void interruptIfNecessary() {
156     try {
157       rollLock.lock();
158       this.interrupt();
159     } finally {
160       rollLock.unlock();
161     }
162   }
163 
164   protected HLog getWAL() throws IOException {
165     return this.services.getWAL(null);
166   }
167 
168   @Override
169   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
170     // Not interested
171   }
172 
173   @Override
174   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
175     // Not interested
176   }
177 
178   @Override
179   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
180     // Not interested
181   }
182 
183   @Override
184   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
185     // Not interested
186   }
187 
188   @Override
189   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
190       WALEdit logEdit) {
191     // Not interested.
192   }
193 
194   @Override
195   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
196                                        WALEdit logEdit) {
197     //Not interested
198   }
199 
200   @Override
201   public void logCloseRequested() {
202     // not interested
203   }
204 }