View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver;
20  
21  import java.io.IOException;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.locks.ReentrantLock;
26  
27  import org.apache.commons.logging.Log;
28  import org.apache.commons.logging.LogFactory;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.HConstants;
31  import org.apache.hadoop.hbase.Server;
32  import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
33  import org.apache.hadoop.hbase.wal.WAL;
34  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
35  import org.apache.hadoop.hbase.util.Bytes;
36  import org.apache.hadoop.hbase.util.HasThread;
37  import org.apache.hadoop.ipc.RemoteException;
38  
39  /**
40   * Runs periodically to determine if the WAL should be rolled.
41   *
42   * NOTE: This class extends Thread rather than Chore because the sleep time
43   * can be interrupted when there is something to do, rather than the Chore
44   * sleep time which is invariant.
45   *
46   * TODO: change to a pool of threads
47   */
48  @InterfaceAudience.Private
49  class LogRoller extends HasThread {
50    static final Log LOG = LogFactory.getLog(LogRoller.class);
51    private final ReentrantLock rollLock = new ReentrantLock();
52    private final AtomicBoolean rollLog = new AtomicBoolean(false);
53    private final ConcurrentHashMap<WAL, Boolean> walNeedsRoll =
54        new ConcurrentHashMap<WAL, Boolean>();
55    private final Server server;
56    protected final RegionServerServices services;
57    private volatile long lastrolltime = System.currentTimeMillis();
58    // Period to roll log.
59    private final long rollperiod;
60    private final int threadWakeFrequency;
61  
62    public void addWAL(final WAL wal) {
63      if (null == walNeedsRoll.putIfAbsent(wal, Boolean.FALSE)) {
64        wal.registerWALActionsListener(new WALActionsListener.Base() {
65          @Override
66          public void logRollRequested() {
67            walNeedsRoll.put(wal, Boolean.TRUE);
68            // TODO logs will contend with each other here, replace with e.g. DelayedQueue
69            synchronized(rollLog) {
70              rollLog.set(true);
71              rollLog.notifyAll();
72            }
73          }
74        });
75      }
76    }
77  
78    public void requestRollAll() {
79      for (WAL wal : walNeedsRoll.keySet()) {
80        walNeedsRoll.put(wal, Boolean.TRUE);
81      }
82      synchronized(rollLog) {
83        rollLog.set(true);
84        rollLog.notifyAll();
85      }
86    }
87  
88    /** @param server */
89    public LogRoller(final Server server, final RegionServerServices services) {
90      super();
91      this.server = server;
92      this.services = services;
93      this.rollperiod = this.server.getConfiguration().
94        getLong("hbase.regionserver.logroll.period", 3600000);
95      this.threadWakeFrequency = this.server.getConfiguration().
96        getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
97    }
98  
99    @Override
100   public void run() {
101     while (!server.isStopped()) {
102       long now = System.currentTimeMillis();
103       boolean periodic = false;
104       if (!rollLog.get()) {
105         periodic = (now - this.lastrolltime) > this.rollperiod;
106         if (!periodic) {
107           synchronized (rollLog) {
108             try {
109               if (!rollLog.get()) rollLog.wait(this.threadWakeFrequency);
110             } catch (InterruptedException e) {
111               // Fall through
112             }
113           }
114           continue;
115         }
116         // Time for periodic roll
117         if (LOG.isDebugEnabled()) {
118           LOG.debug("Wal roll period " + this.rollperiod + "ms elapsed");
119         }
120       } else if (LOG.isDebugEnabled()) {
121         LOG.debug("WAL roll requested");
122       }
123       rollLock.lock(); // FindBugs UL_UNRELEASED_LOCK_EXCEPTION_PATH
124       try {
125         this.lastrolltime = now;
126         for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) {
127           final WAL wal = entry.getKey();
128           // Force the roll if the logroll.period is elapsed or if a roll was requested.
129           // The returned value is an array of actual region names.
130           final byte [][] regionsToFlush = wal.rollWriter(periodic ||
131               entry.getValue().booleanValue());
132           walNeedsRoll.put(wal, Boolean.FALSE);
133           if (regionsToFlush != null) {
134             for (byte [] r: regionsToFlush) scheduleFlush(r);
135           }
136         }
137       } catch (FailedLogCloseException e) {
138         server.abort("Failed log close in log roller", e);
139       } catch (java.net.ConnectException e) {
140         server.abort("Failed log close in log roller", e);
141       } catch (IOException ex) {
142         // Abort if we get here.  We probably won't recover an IOE. HBASE-1132
143         server.abort("IOE in log roller",
144           ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex);
145       } catch (Exception ex) {
146         LOG.error("Log rolling failed", ex);
147         server.abort("Log rolling failed", ex);
148       } finally {
149         try {
150           rollLog.set(false);
151         } finally {
152           rollLock.unlock();
153         }
154       }
155     }
156     LOG.info("LogRoller exiting.");
157   }
158 
159   /**
160    * @param encodedRegionName Encoded name of region to flush.
161    */
162   private void scheduleFlush(final byte [] encodedRegionName) {
163     boolean scheduled = false;
164     HRegion r = this.services.getFromOnlineRegions(Bytes.toString(encodedRegionName));
165     FlushRequester requester = null;
166     if (r != null) {
167       requester = this.services.getFlushRequester();
168       if (requester != null) {
169         requester.requestFlush(r);
170         scheduled = true;
171       }
172     }
173     if (!scheduled) {
174       LOG.warn("Failed to schedule flush of " +
175         Bytes.toString(encodedRegionName) + ", region=" + r + ", requester=" +
176         requester);
177     }
178   }
179 
180 }