001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.backup.regionserver;
019
020import java.nio.charset.StandardCharsets;
021import java.util.HashMap;
022import java.util.List;
023import java.util.concurrent.Callable;
024import org.apache.hadoop.hbase.backup.impl.BackupSystemTable;
025import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
026import org.apache.hadoop.hbase.client.Connection;
027import org.apache.hadoop.hbase.errorhandling.ForeignException;
028import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
029import org.apache.hadoop.hbase.procedure.ProcedureMember;
030import org.apache.hadoop.hbase.procedure.Subprocedure;
031import org.apache.hadoop.hbase.regionserver.HRegionServer;
032import org.apache.hadoop.hbase.regionserver.RegionServerServices;
033import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
034import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
035import org.apache.hadoop.hbase.wal.WAL;
036import org.apache.yetus.audience.InterfaceAudience;
037import org.slf4j.Logger;
038import org.slf4j.LoggerFactory;
039
040/**
041 * This backup sub-procedure implementation forces a WAL rolling on a RS.
042 */
043@InterfaceAudience.Private
044public class LogRollBackupSubprocedure extends Subprocedure {
045  private static final Logger LOG = LoggerFactory.getLogger(LogRollBackupSubprocedure.class);
046
047  private final RegionServerServices rss;
048  private final LogRollBackupSubprocedurePool taskManager;
049  private String backupRoot;
050
051  public LogRollBackupSubprocedure(RegionServerServices rss, ProcedureMember member,
052    ForeignExceptionDispatcher errorListener, long wakeFrequency, long timeout,
053    LogRollBackupSubprocedurePool taskManager, byte[] data) {
054    super(member, LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_NAME, errorListener,
055      wakeFrequency, timeout);
056    LOG.info("Constructing a LogRollBackupSubprocedure.");
057    this.rss = rss;
058    this.taskManager = taskManager;
059    if (data != null) {
060      backupRoot = new String(data, StandardCharsets.UTF_8);
061    }
062  }
063
064  /**
065   * Callable task. TODO. We don't need a thread pool to execute roll log. This can be simplified
066   * with no use of sub-procedure pool.
067   */
068  class RSRollLogTask implements Callable<Void> {
069    RSRollLogTask() {
070    }
071
072    @Override
073    public Void call() throws Exception {
074      if (LOG.isDebugEnabled()) {
075        LOG.debug("DRPC started: " + rss.getServerName());
076      }
077
078      AbstractFSWAL<?> fsWAL = (AbstractFSWAL<?>) rss.getWAL(null);
079      long filenum = fsWAL.getFilenum();
080      List<WAL> wals = rss.getWALs();
081      long highest = -1;
082      for (WAL wal : wals) {
083        if (wal == null) {
084          continue;
085        }
086
087        if (((AbstractFSWAL<?>) wal).getFilenum() > highest) {
088          highest = ((AbstractFSWAL<?>) wal).getFilenum();
089        }
090      }
091
092      LOG.info("Trying to roll log in backup subprocedure, current log number: " + filenum
093        + " highest: " + highest + " on " + rss.getServerName());
094      ((HRegionServer) rss).getWalRoller().requestRollAll();
095      long start = EnvironmentEdgeManager.currentTime();
096      while (!((HRegionServer) rss).getWalRoller().walRollFinished()) {
097        Thread.sleep(20);
098      }
099      LOG.debug("log roll took " + (EnvironmentEdgeManager.currentTime() - start));
100      LOG.info("After roll log in backup subprocedure, current log number: " + fsWAL.getFilenum()
101        + " on " + rss.getServerName());
102
103      Connection connection = rss.getConnection();
104      try (final BackupSystemTable table = new BackupSystemTable(connection)) {
105        // sanity check, good for testing
106        HashMap<String, Long> serverTimestampMap =
107          table.readRegionServerLastLogRollResult(backupRoot);
108        String host = rss.getServerName().getHostname();
109        int port = rss.getServerName().getPort();
110        String server = host + ":" + port;
111        Long sts = serverTimestampMap.get(host);
112        if (sts != null && sts > highest) {
113          LOG
114            .warn("Won't update server's last roll log result: current=" + sts + " new=" + highest);
115          return null;
116        }
117        // write the log number to backup system table.
118        table.writeRegionServerLastLogRollResult(server, highest, backupRoot);
119        return null;
120      } catch (Exception e) {
121        LOG.error(e.toString(), e);
122        throw e;
123      }
124    }
125  }
126
127  private void rolllog() throws ForeignException {
128    monitor.rethrowException();
129
130    taskManager.submitTask(new RSRollLogTask());
131    monitor.rethrowException();
132
133    // wait for everything to complete.
134    taskManager.waitForOutstandingTasks();
135    monitor.rethrowException();
136  }
137
138  @Override
139  public void acquireBarrier() {
140    // do nothing, executing in inside barrier step.
141  }
142
143  /**
144   * do a log roll.
145   * @return some bytes
146   */
147  @Override
148  public byte[] insideBarrier() throws ForeignException {
149    rolllog();
150    return null;
151  }
152
153  /**
154   * Cancel threads if they haven't finished.
155   */
156  @Override
157  public void cleanup(Exception e) {
158    taskManager.abort("Aborting log roll subprocedure tasks for backup due to error", e);
159  }
160
161  /**
162   * Hooray!
163   */
164  public void releaseBarrier() {
165    // NO OP
166  }
167}