View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.concurrent.atomic.AtomicBoolean;
23  import java.util.concurrent.locks.ReentrantLock;
24  
25  import org.apache.commons.logging.Log;
26  import org.apache.commons.logging.LogFactory;
27  import org.apache.hadoop.classification.InterfaceAudience;
28  import org.apache.hadoop.fs.Path;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.HRegionInfo;
31  import org.apache.hadoop.hbase.HTableDescriptor;
32  import org.apache.hadoop.hbase.Server;
33  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
34  import org.apache.hadoop.hbase.regionserver.wal.HLog;
35  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
36  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
37  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
38  import org.apache.hadoop.hbase.util.Bytes;
39  import org.apache.hadoop.hbase.util.HasThread;
40  import org.apache.hadoop.ipc.RemoteException;
41  
42  /**
43   * Runs periodically to determine if the HLog should be rolled.
44   *
45   * NOTE: This class extends Thread rather than Chore because the sleep time
46   * can be interrupted when there is something to do, rather than the Chore
47   * sleep time which is invariant.
48   */
49  @InterfaceAudience.Private
50  class LogRoller extends HasThread implements WALActionsListener {
51    static final Log LOG = LogFactory.getLog(LogRoller.class);
52    private final ReentrantLock rollLock = new ReentrantLock();
53    private final AtomicBoolean rollLog = new AtomicBoolean(false);
54    private final Server server;
55    protected final RegionServerServices services;
56    private volatile long lastrolltime = System.currentTimeMillis();
57    // Period to roll log.
58    private final long rollperiod;
59    private final int threadWakeFrequency;
60  
61    /** @param server */
62    public LogRoller(final Server server, final RegionServerServices services) {
63      super();
64      this.server = server;
65      this.services = services;
66      this.rollperiod = this.server.getConfiguration().
67        getLong("hbase.regionserver.logroll.period", 3600000);
68      this.threadWakeFrequency = this.server.getConfiguration().
69        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
70    }
71  
72    @Override
73    public void run() {
74      while (!server.isStopped()) {
75        long now = System.currentTimeMillis();
76        boolean periodic = false;
77        if (!rollLog.get()) {
78          periodic = (now - this.lastrolltime) > this.rollperiod;
79          if (!periodic) {
80            synchronized (rollLog) {
81              try {
82                if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency);
83              } catch (InterruptedException e) {
84                // Fall through
85              }
86            }
87            continue;
88          }
89          // Time for periodic roll
90          if (LOG.isDebugEnabled()) {
91            LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
92          }
93        } else if (LOG.isDebugEnabled()) {
94          LOG.debug("HLog roll requested");
95        }
96        rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
97        try {
98          this.lastrolltime = now;
99          // Force the roll if the logroll.period is elapsed or if a roll was requested.
100         // The returned value is an array of actual region names.
101         byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
102         if (regionsToFlush != null) {
103           for (byte [] r: regionsToFlush) scheduleFlush(r);
104         }
105       } catch (FailedLogCloseException e) {
106         server.abort("Failed log close in log roller", e);
107       } catch (java.net.ConnectException e) {
108         server.abort("Failed log close in log roller", e);
109       } catch (IOException ex) {
110         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
111         server.abort("IOE in log roller",
112           ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
113       } catch (Exception ex) {
114         LOG.error("Log rolling failed", ex);
115         server.abort("Log rolling failed", ex);
116       } finally {
117         try {
118           rollLog.set(false);
119         } finally {
120           rollLock.unlock();
121         }
122       }
123     }
124     LOG.info("LogRoller exiting.");
125   }
126 
127   /**
128    * @param encodedRegionName Encoded name of region to flush.
129    */
130   private void scheduleFlush(final byte [] encodedRegionName) {
131     boolean scheduled = false;
132     HRegion r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
133     FlushRequester requester = null;
134     if (r != null) {
135       requester = this.services.getFlushRequester();
136       if (requester != null) {
137         requester.requestFlush(r);
138         scheduled = true;
139       }
140     }
141     if (!scheduled) {
142       LOG.warn("Failed to schedule flush of " +
143         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
144         requester);
145     }
146   }
147 
148   public void logRollRequested() {
149     synchronized (rollLog) {
150       rollLog.set(true);
151       rollLog.notifyAll();
152     }
153   }
154 
155   protected HLog getWAL() throws IOException {
156     return this.services.getWAL(null);
157   }
158 
159   @Override
160   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
161     // Not interested
162   }
163 
164   @Override
165   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
166     // Not interested
167   }
168 
169   @Override
170   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
171     // Not interested
172   }
173 
174   @Override
175   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
176     // Not interested
177   }
178 
179   @Override
180   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
181       WALEdit logEdit) {
182     // Not interested.
183   }
184 
185   @Override
186   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
187                                        WALEdit logEdit) {
188     //Not interested
189   }
190 
191   @Override
192   public void logCloseRequested() {
193     // not interested
194   }
195 }