001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.master.replication; 019 020import java.io.IOException; 021import java.util.Optional; 022import org.apache.hadoop.hbase.ServerName; 023import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; 024import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; 025import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; 026import org.apache.hadoop.hbase.master.procedure.ServerRemoteProcedure; 027import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; 028import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; 029import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; 030import org.apache.hadoop.hbase.replication.regionserver.ClaimReplicationQueueCallable; 031import org.apache.yetus.audience.InterfaceAudience; 032import org.slf4j.Logger; 033import org.slf4j.LoggerFactory; 034 035import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; 036import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteParameter; 037import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.ClaimReplicationQueueRemoteStateData; 038 039@InterfaceAudience.Private 040public class ClaimReplicationQueueRemoteProcedure extends ServerRemoteProcedure 041 implements ServerProcedureInterface, RemoteProcedure<MasterProcedureEnv, ServerName> { 042 043 private static final Logger LOG = 044 LoggerFactory.getLogger(ClaimReplicationQueueRemoteProcedure.class); 045 046 private ServerName crashedServer; 047 048 private String queue; 049 050 public ClaimReplicationQueueRemoteProcedure() { 051 } 052 053 public ClaimReplicationQueueRemoteProcedure(ServerName crashedServer, String queue, 054 ServerName targetServer) { 055 this.crashedServer = crashedServer; 056 this.queue = queue; 057 this.targetServer = targetServer; 058 } 059 060 @Override 061 public Optional<RemoteOperation> remoteCallBuild(MasterProcedureEnv env, ServerName remote) { 062 assert targetServer.equals(remote); 063 return Optional.of(new ServerOperation(this, getProcId(), ClaimReplicationQueueCallable.class, 064 ClaimReplicationQueueRemoteParameter.newBuilder() 065 .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue).build() 066 .toByteArray())); 067 } 068 069 @Override 070 public ServerName getServerName() { 071 // return crashed server here, as we are going to recover its replication queues so we should 072 // use its scheduler queue instead of the one for the target server. 073 return crashedServer; 074 } 075 076 @Override 077 public boolean hasMetaTableRegion() { 078 return false; 079 } 080 081 @Override 082 public ServerOperationType getServerOperationType() { 083 return ServerOperationType.CLAIM_REPLICATION_QUEUE_REMOTE; 084 } 085 086 @Override 087 protected void complete(MasterProcedureEnv env, Throwable error) { 088 if (error != null) { 089 LOG.warn("Failed to claim replication queue {} of crashed server on server {} ", queue, 090 crashedServer, targetServer, error); 091 this.succ = false; 092 } else { 093 this.succ = true; 094 } 095 } 096 097 @Override 098 protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { 099 throw new UnsupportedOperationException(); 100 } 101 102 @Override 103 protected boolean abort(MasterProcedureEnv env) { 104 return false; 105 } 106 107 @Override 108 protected boolean waitInitialized(MasterProcedureEnv env) { 109 return env.waitInitialized(this); 110 } 111 112 @Override 113 protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { 114 serializer.serialize(ClaimReplicationQueueRemoteStateData.newBuilder() 115 .setCrashedServer(ProtobufUtil.toServerName(crashedServer)).setQueue(queue) 116 .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); 117 } 118 119 @Override 120 protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { 121 ClaimReplicationQueueRemoteStateData data = 122 serializer.deserialize(ClaimReplicationQueueRemoteStateData.class); 123 crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); 124 queue = data.getQueue(); 125 targetServer = ProtobufUtil.toServerName(data.getTargetServer()); 126 } 127}