001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.procedure;
019
020import java.io.IOException;
021import java.util.List;
022import org.apache.hadoop.hbase.ServerName;
023import org.apache.hadoop.hbase.master.ServerListener;
024import org.apache.hadoop.hbase.master.ServerManager;
025import org.apache.hadoop.hbase.procedure2.Procedure;
026import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
027import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
028import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
029import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
030import org.apache.yetus.audience.InterfaceAudience;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.LastHighestWalFilenum;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollProcedureState;
037import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.LogRollRemoteProcedureResult;
038
039/**
040 * The procedure to perform WAL rolling on all of RegionServers.
041 */
042@InterfaceAudience.Private
043public class LogRollProcedure
044  extends StateMachineProcedure<MasterProcedureEnv, LogRollProcedureState>
045  implements GlobalProcedureInterface {
046
047  private static final Logger LOG = LoggerFactory.getLogger(LogRollProcedure.class);
048
049  public LogRollProcedure() {
050  }
051
052  @Override
053  protected Flow executeFromState(MasterProcedureEnv env, LogRollProcedureState state)
054    throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
055    LOG.info("{} execute state={}", this, state);
056
057    final ServerManager serverManager = env.getMasterServices().getServerManager();
058
059    try {
060      switch (state) {
061        case LOG_ROLL_ROLL_LOG_ON_RS:
062          // avoid potential new region server missing
063          serverManager.registerListener(new NewServerWALRoller(env));
064
065          final List<LogRollRemoteProcedure> subProcedures =
066            serverManager.getOnlineServersList().stream().map(LogRollRemoteProcedure::new).toList();
067          addChildProcedure(subProcedures.toArray(new LogRollRemoteProcedure[0]));
068          setNextState(LogRollProcedureState.LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM);
069          return Flow.HAS_MORE_STATE;
070        case LOG_ROLL_COLLECT_RS_HIGHEST_WAL_FILENUM:
071          // get children procedure
072          List<LogRollRemoteProcedure> children =
073            env.getMasterServices().getMasterProcedureExecutor().getProcedures().stream()
074              .filter(p -> p instanceof LogRollRemoteProcedure)
075              .filter(p -> p.getParentProcId() == getProcId()).map(p -> (LogRollRemoteProcedure) p)
076              .toList();
077          LastHighestWalFilenum.Builder builder = LastHighestWalFilenum.newBuilder();
078          for (Procedure<MasterProcedureEnv> child : children) {
079            LogRollRemoteProcedureResult result =
080              LogRollRemoteProcedureResult.parseFrom(child.getResult());
081            builder.putFileNum(ProtobufUtil.toServerName(result.getServerName()).toString(),
082              result.getLastHighestWalFilenum());
083          }
084          setResult(builder.build().toByteArray());
085          setNextState(LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER);
086          return Flow.HAS_MORE_STATE;
087        case LOG_ROLL_UNREGISTER_SERVER_LISTENER:
088          serverManager.unregisterListenerIf(l -> l instanceof NewServerWALRoller);
089          return Flow.NO_MORE_STATE;
090      }
091    } catch (Exception e) {
092      setFailure("log-roll", e);
093    }
094    return Flow.NO_MORE_STATE;
095  }
096
097  @Override
098  public String getGlobalId() {
099    return getClass().getSimpleName();
100  }
101
102  private static final class NewServerWALRoller implements ServerListener {
103
104    private final MasterProcedureEnv env;
105
106    public NewServerWALRoller(MasterProcedureEnv env) {
107      this.env = env;
108    }
109
110    @Override
111    public void serverAdded(ServerName server) {
112      env.getMasterServices().getMasterProcedureExecutor()
113        .submitProcedure(new LogRollRemoteProcedure(server));
114    }
115  }
116
117  @Override
118  protected void rollbackState(MasterProcedureEnv env, LogRollProcedureState state) {
119    // nothing to rollback
120  }
121
122  @Override
123  protected LogRollProcedureState getState(int stateId) {
124    return LogRollProcedureState.forNumber(stateId);
125  }
126
127  @Override
128  protected int getStateId(LogRollProcedureState state) {
129    return state.getNumber();
130  }
131
132  @Override
133  protected LogRollProcedureState getInitialState() {
134    return LogRollProcedureState.LOG_ROLL_ROLL_LOG_ON_RS;
135  }
136
137  @Override
138  protected boolean abort(MasterProcedureEnv env) {
139    return false;
140  }
141
142  @Override
143  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
144    super.serializeStateData(serializer);
145
146    if (getResult() != null && getResult().length > 0) {
147      serializer.serialize(LastHighestWalFilenum.parseFrom(getResult()));
148    } else {
149      serializer.serialize(LastHighestWalFilenum.getDefaultInstance());
150    }
151  }
152
153  @Override
154  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
155    super.deserializeStateData(serializer);
156
157    if (getResult() == null) {
158      LastHighestWalFilenum lastHighestWalFilenum =
159        serializer.deserialize(LastHighestWalFilenum.class);
160      if (lastHighestWalFilenum != null) {
161        if (
162          lastHighestWalFilenum.getFileNumMap().isEmpty()
163            && getCurrentState() == LogRollProcedureState.LOG_ROLL_UNREGISTER_SERVER_LISTENER
164        ) {
165          LOG.warn("pid = {}, current state is the last state, but rsHighestWalFilenumMap is "
166            + "empty, this should not happen. Are all region servers down ?", getProcId());
167        } else {
168          setResult(lastHighestWalFilenum.toByteArray());
169        }
170      }
171    }
172  }
173
174  @Override
175  protected void toStringClassDetails(StringBuilder sb) {
176    sb.append(getClass().getSimpleName());
177  }
178}