View Javadoc

1   /**
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.fs.Path;
25  import org.apache.hadoop.hbase.*;
26  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
27  import org.apache.hadoop.hbase.regionserver.wal.HLog;
28  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
29  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
30  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
31  import org.apache.hadoop.hbase.util.Bytes;
32  import org.apache.hadoop.hbase.util.HasThread;
33  
34  import java.io.IOException;
35  import java.util.concurrent.atomic.AtomicBoolean;
36  import java.util.concurrent.locks.ReentrantLock;
37  
38  /**
39   * Runs periodically to determine if the HLog should be rolled.
40   *
41   * NOTE: This class extends Thread rather than Chore because the sleep time
42   * can be interrupted when there is something to do, rather than the Chore
43   * sleep time which is invariant.
44   */
45  class LogRoller extends HasThread implements WALActionsListener {
46    static final Log LOG = LogFactory.getLog(LogRoller.class);
47    private final ReentrantLock rollLock = new ReentrantLock();
48    private final AtomicBoolean rollLog = new AtomicBoolean(false);
49    private final Server server;
50    protected final RegionServerServices services;
51    private volatile long lastrolltime = System.currentTimeMillis();
52    // Period to roll log.
53    private final long rollperiod;
54    private final int threadWakeFrequency;
55  
56    /** @param server */
57    public LogRoller(final Server server, final RegionServerServices services) {
58      super();
59      this.server = server;
60      this.services = services;
61      this.rollperiod = this.server.getConfiguration().
62        getLong("hbase.regionserver.logroll.period", 3600000);
63      this.threadWakeFrequency = this.server.getConfiguration().
64        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
65    }
66  
67    @Override
68    public void run() {
69      while (!server.isStopped()) {
70        long now = System.currentTimeMillis();
71        boolean periodic = false;
72        if (!rollLog.get()) {
73          periodic = (now - this.lastrolltime) > this.rollperiod;
74          if (!periodic) {
75            synchronized (rollLog) {
76              try {
77                rollLog.wait(this.threadWakeFrequency);
78              } catch (InterruptedException e) {
79                // Fall through
80              }
81            }
82            continue;
83          }
84          // Time for periodic roll
85          if (LOG.isDebugEnabled()) {
86            LOG.debug("Hlog roll period " + this.rollperiod + "ms elapsed");
87          }
88        } else if (LOG.isDebugEnabled()) {
89          LOG.debug("HLog roll requested");
90        }
91        rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
92        try {
93          this.lastrolltime = now;
94          // Force the roll if the logroll.period is elapsed or if a roll was requested.
95          // The returned value is an array of actual region names.
96          byte [][] regionsToFlush = getWAL().rollWriter(periodic || rollLog.get());
97          if (regionsToFlush != null) {
98            for (byte [] r: regionsToFlush) scheduleFlush(r);
99          }
100       } catch (FailedLogCloseException e) {
101         server.abort("Failed log close in log roller", e);
102       } catch (java.net.ConnectException e) {
103         server.abort("Failed log close in log roller", e);
104       } catch (IOException ex) {
105         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
106         server.abort("IOE in log roller",
107           RemoteExceptionHandler.checkIOException(ex));
108       } catch (Exception ex) {
109         LOG.error("Log rolling failed", ex);
110         server.abort("Log rolling failed", ex);
111       } finally {
112         rollLog.set(false);
113         rollLock.unlock();
114       }
115     }
116     LOG.info("LogRoller exiting.");
117   }
118 
119   /**
120    * @param encodedRegionName Encoded name of region to flush.
121    */
122   private void scheduleFlush(final byte [] encodedRegionName) {
123     boolean scheduled = false;
124     HRegion r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
125     FlushRequester requester = null;
126     if (r != null) {
127       requester = this.services.getFlushRequester();
128       if (requester != null) {
129         requester.requestFlush(r);
130         scheduled = true;
131       }
132     }
133     if (!scheduled) {
134       LOG.warn("Failed to schedule flush of " +
135         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
136         requester);
137     }
138   }
139 
140   public void logRollRequested() {
141     synchronized (rollLog) {
142       rollLog.set(true);
143       rollLog.notifyAll();
144     }
145   }
146 
147   /**
148    * Called by region server to wake up this thread if it sleeping.
149    * It is sleeping if rollLock is not held.
150    */
151   public void interruptIfNecessary() {
152     try {
153       rollLock.lock();
154       this.interrupt();
155     } finally {
156       rollLock.unlock();
157     }
158   }
159 
160   protected HLog getWAL() throws IOException {
161     return this.services.getWAL(null);
162   }
163 
164   @Override
165   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
166     // Not interested
167   }
168 
169   @Override
170   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
171     // Not interested
172   }
173 
174   @Override
175   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
176     // Not interested
177   }
178 
179   @Override
180   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
181     // Not interested
182   }
183 
184   @Override
185   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
186       WALEdit logEdit) {
187     // Not interested.
188   }
189 
190   @Override
191   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
192                                        WALEdit logEdit) {
193     //Not interested
194   }
195 
196   @Override
197   public void logCloseRequested() {
198     // not interested
199   }
200 }