001/* 002 * 003 * Licensed to the Apache Software Foundation (ASF) under one 004 * or more contributor license agreements. See the NOTICE file 005 * distributed with this work for additional information 006 * regarding copyright ownership. The ASF licenses this file 007 * to you under the Apache License, Version 2.0 (the 008 * "License"); you may not use this file except in compliance 009 * with the License. You may obtain a copy of the License at 010 * 011 * http://www.apache.org/licenses/LICENSE-2.0 012 * 013 * Unless required by applicable law or agreed to in writing, software 014 * distributed under the License is distributed on an "AS IS" BASIS, 015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 016 * See the License for the specific language governing permissions and 017 * limitations under the License. 018 */ 019package org.apache.hadoop.hbase.regionserver; 020 021import java.io.Closeable; 022import java.io.IOException; 023import java.net.ConnectException; 024import java.util.ArrayList; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map.Entry; 028import java.util.concurrent.ConcurrentHashMap; 029import java.util.concurrent.ConcurrentMap; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 032import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; 033import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; 034import org.apache.hadoop.hbase.util.Bytes; 035import org.apache.hadoop.hbase.util.HasThread; 036import org.apache.hadoop.hbase.wal.WAL; 037import org.apache.hadoop.ipc.RemoteException; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; 043 044/** 045 * Runs periodically to determine if the WAL should be rolled. 046 * 047 * NOTE: This class extends Thread rather than Chore because the sleep time 048 * can be interrupted when there is something to do, rather than the Chore 049 * sleep time which is invariant. 050 * 051 * TODO: change to a pool of threads 052 */ 053@InterfaceAudience.Private 054@VisibleForTesting 055public class LogRoller extends HasThread implements Closeable { 056 private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class); 057 private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>(); 058 protected final RegionServerServices services; 059 private volatile long lastRollTime = System.currentTimeMillis(); 060 // Period to roll log. 061 private final long rollPeriod; 062 private final int threadWakeFrequency; 063 // The interval to check low replication on hlog's pipeline 064 private long checkLowReplicationInterval; 065 066 private volatile boolean running = true; 067 068 public void addWAL(WAL wal) { 069 // check without lock first 070 if (walNeedsRoll.containsKey(wal)) { 071 return; 072 } 073 // this is to avoid race between addWAL and requestRollAll. 074 synchronized (this) { 075 if (walNeedsRoll.putIfAbsent(wal, Boolean.FALSE) == null) { 076 wal.registerWALActionsListener(new WALActionsListener() { 077 @Override 078 public void logRollRequested(boolean lowReplicas) { 079 // TODO logs will contend with each other here, replace with e.g. DelayedQueue 080 synchronized (LogRoller.this) { 081 walNeedsRoll.put(wal, Boolean.TRUE); 082 LogRoller.this.notifyAll(); 083 } 084 } 085 }); 086 } 087 } 088 } 089 090 public void requestRollAll() { 091 synchronized (this) { 092 List<WAL> wals = new ArrayList<WAL>(walNeedsRoll.keySet()); 093 for (WAL wal : wals) { 094 walNeedsRoll.put(wal, Boolean.TRUE); 095 } 096 notifyAll(); 097 } 098 } 099 100 public LogRoller(RegionServerServices services) { 101 super("LogRoller"); 102 this.services = services; 103 this.rollPeriod = this.services.getConfiguration(). 104 getLong("hbase.regionserver.logroll.period", 3600000); 105 this.threadWakeFrequency = this.services.getConfiguration(). 106 getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); 107 this.checkLowReplicationInterval = this.services.getConfiguration().getLong( 108 "hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000); 109 } 110 111 /** 112 * we need to check low replication in period, see HBASE-18132 113 */ 114 private void checkLowReplication(long now) { 115 try { 116 for (Entry<WAL, Boolean> entry : walNeedsRoll.entrySet()) { 117 WAL wal = entry.getKey(); 118 boolean needRollAlready = entry.getValue(); 119 if (needRollAlready || !(wal instanceof AbstractFSWAL)) { 120 continue; 121 } 122 ((AbstractFSWAL<?>) wal).checkLogLowReplication(checkLowReplicationInterval); 123 } 124 } catch (Throwable e) { 125 LOG.warn("Failed checking low replication", e); 126 } 127 } 128 129 private void abort(String reason, Throwable cause) { 130 // close all WALs before calling abort on RS. 131 // This is because AsyncFSWAL replies on us for rolling a new writer to make progress, and if we 132 // failed, AsyncFSWAL may be stuck, so we need to close it to let the upper layer know that it 133 // is already broken. 134 for (WAL wal : walNeedsRoll.keySet()) { 135 // shutdown rather than close here since we are going to abort the RS and the wals need to be 136 // split when recovery 137 try { 138 wal.shutdown(); 139 } catch (IOException e) { 140 LOG.warn("Failed to shutdown wal", e); 141 } 142 } 143 this.services.abort(reason, cause); 144 } 145 146 @Override 147 public void run() { 148 while (running) { 149 boolean periodic = false; 150 long now = System.currentTimeMillis(); 151 checkLowReplication(now); 152 periodic = (now - this.lastRollTime) > this.rollPeriod; 153 if (periodic) { 154 // Time for periodic roll, fall through 155 LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod); 156 } else { 157 synchronized (this) { 158 if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) { 159 // WAL roll requested, fall through 160 LOG.debug("WAL roll requested"); 161 } else { 162 try { 163 wait(this.threadWakeFrequency); 164 } catch (InterruptedException e) { 165 // restore the interrupt state 166 Thread.currentThread().interrupt(); 167 } 168 // goto the beginning to check whether again whether we should fall through to roll 169 // several WALs, and also check whether we should quit. 170 continue; 171 } 172 } 173 } 174 try { 175 this.lastRollTime = System.currentTimeMillis(); 176 for (Iterator<Entry<WAL, Boolean>> iter = walNeedsRoll.entrySet().iterator(); iter 177 .hasNext();) { 178 Entry<WAL, Boolean> entry = iter.next(); 179 WAL wal = entry.getKey(); 180 // reset the flag in front to avoid missing roll request before we return from rollWriter. 181 walNeedsRoll.put(wal, Boolean.FALSE); 182 // Force the roll if the logroll.period is elapsed or if a roll was requested. 183 // The returned value is an array of actual region names. 184 byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue()); 185 if (regionsToFlush != null) { 186 for (byte[] r : regionsToFlush) { 187 scheduleFlush(Bytes.toString(r)); 188 } 189 } 190 } 191 } catch (FailedLogCloseException | ConnectException e) { 192 abort("Failed log close in log roller", e); 193 } catch (IOException ex) { 194 // Abort if we get here. We probably won't recover an IOE. HBASE-1132 195 abort("IOE in log roller", 196 ex instanceof RemoteException ? ((RemoteException) ex).unwrapRemoteException() : ex); 197 } catch (Exception ex) { 198 LOG.error("Log rolling failed", ex); 199 abort("Log rolling failed", ex); 200 } 201 } 202 LOG.info("LogRoller exiting."); 203 } 204 205 /** 206 * @param encodedRegionName Encoded name of region to flush. 207 */ 208 private void scheduleFlush(String encodedRegionName) { 209 HRegion r = (HRegion) this.services.getRegion(encodedRegionName); 210 if (r == null) { 211 LOG.warn("Failed to schedule flush of {}, because it is not online on us", encodedRegionName); 212 return; 213 } 214 FlushRequester requester = this.services.getFlushRequester(); 215 if (requester == null) { 216 LOG.warn("Failed to schedule flush of {}, region={}, because FlushRequester is null", 217 encodedRegionName, r); 218 return; 219 } 220 // force flushing all stores to clean old logs 221 requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY); 222 } 223 224 /** 225 * For testing only 226 * @return true if all WAL roll finished 227 */ 228 @VisibleForTesting 229 public boolean walRollFinished() { 230 return walNeedsRoll.values().stream().allMatch(needRoll -> !needRoll); 231 } 232 233 @Override 234 public void close() { 235 running = false; 236 interrupt(); 237 } 238}