001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.backup.regionserver; 019 020import java.nio.charset.StandardCharsets; 021import java.util.HashMap; 022import java.util.List; 023import java.util.concurrent.Callable; 024import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; 025import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager; 026import org.apache.hadoop.hbase.client.Connection; 027import org.apache.hadoop.hbase.errorhandling.ForeignException; 028import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher; 029import org.apache.hadoop.hbase.procedure.ProcedureMember; 030import org.apache.hadoop.hbase.procedure.Subprocedure; 031import org.apache.hadoop.hbase.regionserver.HRegionServer; 032import org.apache.hadoop.hbase.regionserver.RegionServerServices; 033import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; 034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 035import org.apache.hadoop.hbase.wal.WAL; 036import org.apache.yetus.audience.InterfaceAudience; 037import org.slf4j.Logger; 038import org.slf4j.LoggerFactory; 039 040/** 041 * This backup sub-procedure implementation forces a WAL rolling on a RS. 042 */ 043@InterfaceAudience.Private 044public class LogRollBackupSubprocedure extends Subprocedure { 045 private static final Logger LOG = LoggerFactory.getLogger(LogRollBackupSubprocedure.class); 046 047 private final RegionServerServices rss; 048 private final LogRollBackupSubprocedurePool taskManager; 049 private String backupRoot; 050 051 public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member, 052 ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout, 053 LogRollBackupSubprocedurePool taskManager, byte[] data) { 054 super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener, 055 wakeFrequency, timeout); 056 LOG.info("Constructing a LogRollBackupSubprocedure."); 057 this.rss = rss; 058 this.taskManager = taskManager; 059 if (data != null) { 060 backupRoot = new String(data, StandardCharsets.UTF_8); 061 } 062 } 063 064 /** 065 * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified 066 * with no use of sub-procedure pool. 067 */ 068 class RSRollLogTask implements Callable<Void> { 069 RSRollLogTask() { 070 } 071 072 @Override 073 public Void call() throws Exception { 074 if (LOG.isDebugEnabled()) { 075 LOG.debug("DRPC started: " + rss.getServerName()); 076 } 077 078 AbstractFSWAL<?> fsWAL = (AbstractFSWAL<?>) rss.getWAL(null); 079 long filenum = fsWAL.getFilenum(); 080 List<WAL> wals = rss.getWALs(); 081 long highest = -1; 082 for (WAL wal : wals) { 083 if (wal == null) { 084 continue; 085 } 086 087 if (((AbstractFSWAL<?>) wal).getFilenum() > highest) { 088 highest = ((AbstractFSWAL<?>) wal).getFilenum(); 089 } 090 } 091 092 LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum 093 + " highest: " + highest + " on " + rss.getServerName()); 094 ((HRegionServer) rss).getWalRoller().requestRollAll(); 095 long start = EnvironmentEdgeManager.currentTime(); 096 while (!((HRegionServer) rss).getWalRoller().walRollFinished()) { 097 Thread.sleep(20); 098 } 099 LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start)); 100 LOG.info("After roll log in backup subprocedure, current log number: " + fsWAL.getFilenum() 101 + " on " + rss.getServerName()); 102 103 Connection connection = rss.getConnection(); 104 try (final BackupSystemTable table = new BackupSystemTable(connection)) { 105 // sanity check, good for testing 106 HashMap<String, Long> serverTimestampMap = 107 table.readRegionServerLastLogRollResult(backupRoot); 108 String host = rss.getServerName().getHostname(); 109 int port = rss.getServerName().getPort(); 110 String server = host + ":" + port; 111 Long sts = serverTimestampMap.get(host); 112 if (sts != null && sts > highest) { 113 LOG 114 .warn("Won't update server's last roll log result: current=" + sts + " new=" + highest); 115 return null; 116 } 117 // write the log number to backup system table. 118 table.writeRegionServerLastLogRollResult(server, highest, backupRoot); 119 return null; 120 } catch (Exception e) { 121 LOG.error(e.toString(), e); 122 throw e; 123 } 124 } 125 } 126 127 private void rolllog() throws ForeignException { 128 monitor.rethrowException(); 129 130 taskManager.submitTask(new RSRollLogTask()); 131 monitor.rethrowException(); 132 133 // wait for everything to complete. 134 taskManager.waitForOutstandingTasks(); 135 monitor.rethrowException(); 136 } 137 138 @Override 139 public void acquireBarrier() { 140 // do nothing, executing in inside barrier step. 141 } 142 143 /** 144 * do a log roll. 145 * @return some bytes 146 */ 147 @Override 148 public byte[] insideBarrier() throws ForeignException { 149 rolllog(); 150 return null; 151 } 152 153 /** 154 * Cancel threads if they haven't finished. 155 */ 156 @Override 157 public void cleanup(Exception e) { 158 taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e); 159 } 160 161 /** 162 * Hooray! 163 */ 164 public void releaseBarrier() { 165 // NO OP 166 } 167}