001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Optional; 022import org.apache.hadoop.fs.Path; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.master.MasterFileSystem; 025import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 026import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; 027import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 028import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; 029import org.apache.hadoop.hbase.procedure2.Procedure; 030import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 031import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; 032import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; 033import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 034import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 035import org.apache.hadoop.hbase.replication.ReplicationQueueId; 036import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; 037import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp; 038import org.apache.yetus.audience.InterfaceAudience; 039import org.slf4j.Logger; 040import org.slf4j.LoggerFactory; 041 042import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; 045 046@InterfaceAudience.Private 047public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure 048 implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { 049 050 private static final Logger LOG = 051 LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class); 052 053 private ReplicationQueueId queueId; 054 055 public ClaimReplicationQueueRemoteProcedure() { 056 } 057 058 public ClaimReplicationQueueRemoteProcedure(ReplicationQueueId queueId, ServerName targetServer) { 059 this.queueId = queueId; 060 this.targetServer = targetServer; 061 } 062 063 // check whether ReplicationSyncUp has already done the work for us, if so, we should skip 064 // claiming the replication queues and deleting them instead. 065 private boolean shouldSkip(MasterProcedureEnv env) throws IOException { 066 MasterFileSystem mfs = env.getMasterFileSystem(); 067 Path syncUpDir = new Path(mfs.getRootDir(), ReplicationSyncUp.INFO_DIR); 068 return mfs.getFileSystem().exists(new Path(syncUpDir, getServerName().getServerName())); 069 } 070 071 @Override 072 protected synchronized Procedure<MasterProcedureEnv>[] execute(MasterProcedureEnv env) 073 throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { 074 try { 075 if (shouldSkip(env)) { 076 LOG.info("Skip claiming {} because replication sync up has already done it for us", 077 getServerName()); 078 return null; 079 } 080 } catch (IOException e) { 081 LOG.warn("failed to check whether we should skip claiming {} due to replication sync up", 082 getServerName(), e); 083 // just finish the procedure here, as the AssignReplicationQueuesProcedure will reschedule 084 return null; 085 } 086 return super.execute(env); 087 } 088 089 @Override 090 public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) { 091 assert targetServer.equals(remote); 092 ClaimReplicationQueueRemoteParameter.Builder builder = ClaimReplicationQueueRemoteParameter 093 .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) 094 .setQueue(queueId.getPeerId()); 095 queueId.getSourceServerName() 096 .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); 097 return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class, 098 builder.build().toByteArray())); 099 } 100 101 @Override 102 public ServerName getServerName() { 103 // return crashed server here, as we are going to recover its replication queues so we should 104 // use its scheduler queue instead of the one for the target server. 105 return queueId.getServerName(); 106 } 107 108 @Override 109 public boolean hasMetaTableRegion() { 110 return false; 111 } 112 113 @Override 114 public ServerOperationType getServerOperationType() { 115 return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE; 116 } 117 118 @Override 119 protected void complete(MasterProcedureEnv env, Throwable error) { 120 if (error != null) { 121 LOG.warn("Failed to claim replication queue {} on server {} ", queueId, targetServer, error); 122 this.succ = false; 123 } else { 124 this.succ = true; 125 } 126 } 127 128 @Override 129 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 130 throw new UnsupportedOperationException(); 131 } 132 133 @Override 134 protected boolean abort(MasterProcedureEnv env) { 135 return false; 136 } 137 138 @Override 139 protected boolean waitInitialized(MasterProcedureEnv env) { 140 return env.waitInitialized(this); 141 } 142 143 @Override 144 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 145 ClaimReplicationQueueRemoteStateData.Builder builder = ClaimReplicationQueueRemoteStateData 146 .newBuilder().setCrashedServer(ProtobufUtil.toServerName(queueId.getServerName())) 147 .setQueue(queueId.getPeerId()).setTargetServer(ProtobufUtil.toServerName(targetServer)); 148 queueId.getSourceServerName() 149 .ifPresent(sourceServer -> builder.setSourceServer(ProtobufUtil.toServerName(sourceServer))); 150 serializer.serialize(builder.build()); 151 } 152 153 @Override 154 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 155 ClaimReplicationQueueRemoteStateData data = 156 serializer.deserialize(ClaimReplicationQueueRemoteStateData.class); 157 targetServer = ProtobufUtil.toServerName(data.getTargetServer()); 158 ServerName crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); 159 String queue = data.getQueue(); 160 if (data.hasSourceServer()) { 161 queueId = new ReplicationQueueId(crashedServer, queue, 162 ProtobufUtil.toServerName(data.getSourceServer())); 163 } else { 164 queueId = new ReplicationQueueId(crashedServer, queue); 165 } 166 } 167}