001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.master.replication;
019
020import java.io.IOException;
021import java.util.Optional;
022import org.apache.hadoop.fs.Path;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.master.MasterFileSystem;
025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
026import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation;
027import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface;
028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure;
029import org.apache.hadoop.hbase.procedure2.Procedure;
030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation;
034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
035import org.apache.hadoop.hbase.replication.ReplicationQueueId;
036import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable;
037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData;
045
046@InterfaceAudience.Private
047public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure
048  implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> {
049
050  private static final Logger LOG =
051    LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class);
052
053  private ReplicationQueueId queueId;
054
055  public ClaimReplicationQueueRemoteProcedure() {
056  }
057
058  public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerName targetServer) {
059    this.queueId = queueId;
060    this.targetServer = targetServer;
061  }
062
063  // check whether ReplicationSyncUp has already done the work for us, if so, we should skip
064  // claiming the replication queues and deleting them instead.
065  private boolean shouldSkip(MasterProcedureEnv env) throws IOException {
066    MasterFileSystem mfs = env.getMasterFileSystem();
067    Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR);
068    return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName()));
069  }
070
071  @Override
072  protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env)
073    throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException {
074    try {
075      if (shouldSkip(env)) {
076        LOG.info("Skip claiming {} because replication sync up has already done it for us",
077          getServerName());
078        return null;
079      }
080    } catch (IOException e) {
081      LOG.warn("failed to check whether we should skip claiming {} due to replication sync up",
082        getServerName(), e);
083      // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule
084      return null;
085    }
086    return super.execute(env);
087  }
088
089  @Override
090  public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) {
091    assert targetServer.equals(remote);
092    ClaimReplicationQueueRemoteParameter.Builder builder = ClaimReplicationQueueRemoteParameter
093      .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName()))
094      .setQueue(queueId.getPeerId());
095    queueId.getSourceServerName()
096      .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer)));
097    return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class,
098      builder.build().toByteArray()));
099  }
100
101  @Override
102  public ServerName getServerName() {
103    // return crashed server here, as we are going to recover its replication queues so we should
104    // use its scheduler queue instead of the one for the target server.
105    return queueId.getServerName();
106  }
107
108  @Override
109  public boolean hasMetaTableRegion() {
110    return false;
111  }
112
113  @Override
114  public ServerOperationType getServerOperationType() {
115    return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE;
116  }
117
118  @Override
119  protected void complete(MasterProcedureEnv env, Throwable error) {
120    if (error != null) {
121      LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error);
122      this.succ = false;
123    } else {
124      this.succ = true;
125    }
126  }
127
128  @Override
129  protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException {
130    throw new UnsupportedOperationException();
131  }
132
133  @Override
134  protected boolean abort(MasterProcedureEnv env) {
135    return false;
136  }
137
138  @Override
139  protected boolean waitInitialized(MasterProcedureEnv env) {
140    return env.waitInitialized(this);
141  }
142
143  @Override
144  protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException {
145    ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData
146      .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName()))
147      .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer));
148    queueId.getSourceServerName()
149      .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer)));
150    serializer.serialize(builder.build());
151  }
152
153  @Override
154  protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException {
155    ClaimReplicationQueueRemoteStateData data =
156      serializer.deserialize(ClaimReplicationQueueRemoteStateData.class);
157    targetServer = ProtobufUtil.toServerName(data.getTargetServer());
158    ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer());
159    String queue = data.getQueue();
160    if (data.hasSourceServer()) {
161      queueId = new ReplicationQueueId(crashedServer, queue,
162        ProtobufUtil.toServerName(data.getSourceServer()));
163    } else {
164      queueId = new ReplicationQueueId(crashedServer, queue);
165    }
166  }
167}