001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.Optional; 022import org.apache.hadoop.conf.Configuration; 023import org.apache.hadoop.hbase.ServerName; 024import org.apache.hadoop.hbase.coprocessor.RegionObserver; 025import org.apache.hadoop.hbase.ipc.RpcCall; 026import org.apache.hadoop.hbase.ipc.RpcServer; 027import org.apache.hadoop.hbase.ipc.ServerCall; 028import org.apache.hadoop.hbase.regionserver.RSRpcServices; 029import org.apache.hadoop.hbase.regionserver.RegionScanner; 030import org.apache.hadoop.hbase.security.User; 031import org.apache.hadoop.hbase.security.UserProvider; 032import org.apache.yetus.audience.InterfaceAudience; 033 034import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 035import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 036 037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; 038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; 039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse; 040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest; 041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse; 045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; 046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; 047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest; 048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse; 049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; 050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; 051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest; 052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse; 053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; 054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; 055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; 056 057@InterfaceAudience.Private 058public final class ServerConnectionUtils { 059 060 private ServerConnectionUtils() { 061 } 062 063 /** 064 * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost 065 * if the invocation target is 'this' server; save on network and protobuf invocations. 066 */ 067 // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid. 068 // Class is visible so can assert we are short-circuiting when expected. 069 public final static class ShortCircuitingClusterConnection extends ConnectionImplementation { 070 private final ServerName serverName; 071 private final AdminService.BlockingInterface localHostAdmin; 072 private final ClientService.BlockingInterface localHostClient; 073 private final ClientService.BlockingInterface localClientServiceBlockingInterfaceWrapper; 074 075 private ShortCircuitingClusterConnection(Configuration conf, User user, ServerName serverName, 076 AdminService.BlockingInterface admin, ClientService.BlockingInterface client, 077 ConnectionRegistry registry) throws IOException { 078 super(conf, null, user, registry); 079 this.serverName = serverName; 080 this.localHostAdmin = admin; 081 this.localHostClient = client; 082 this.localClientServiceBlockingInterfaceWrapper = 083 new ClientServiceBlockingInterfaceWrapper(this.localHostClient); 084 } 085 086 @Override 087 public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException { 088 return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn); 089 } 090 091 @Override 092 public ClientService.BlockingInterface getClient(ServerName sn) throws IOException { 093 return serverName.equals(sn) 094 ? this.localClientServiceBlockingInterfaceWrapper 095 : super.getClient(sn); 096 } 097 098 @Override 099 public MasterKeepAliveConnection getMaster() throws IOException { 100 if (this.localHostClient instanceof MasterService.BlockingInterface) { 101 return new ShortCircuitMasterConnection( 102 (MasterService.BlockingInterface) this.localHostClient); 103 } 104 return super.getMaster(); 105 } 106 107 /** 108 * When we directly invoke {@link RSRpcServices#get} on the same RegionServer through 109 * {@link ShortCircuitingClusterConnection} in region CPs such as 110 * {@link RegionObserver#postScannerOpen} to get other rows, the {@link RegionScanner} created 111 * for the directly {@link RSRpcServices#get} may not be closed until the outmost rpc call is 112 * completed if there is an outmost {@link RpcCall}, and even worse , the 113 * {@link ServerCall#rpcCallback} may be override which would cause serious problem,so for 114 * {@link ShortCircuitingClusterConnection#getClient}, if return 115 * {@link ShortCircuitingClusterConnection#localHostClient},we would add a wrapper class to wrap 116 * it , which using {@link RpcServer#unsetCurrentCall} and {RpcServer#setCurrentCall} to 117 * surround the scan and get method call,so the {@link RegionScanner} created for the directly 118 * {@link RSRpcServices#get} could be closed immediately,see HBASE-26812 for more. 119 */ 120 static class ClientServiceBlockingInterfaceWrapper implements ClientService.BlockingInterface { 121 122 private ClientService.BlockingInterface target; 123 124 ClientServiceBlockingInterfaceWrapper(ClientService.BlockingInterface target) { 125 this.target = target; 126 } 127 128 @Override 129 public GetResponse get(RpcController controller, GetRequest request) throws ServiceException { 130 return this.doCall(controller, request, (c, r) -> { 131 return target.get(c, r); 132 }); 133 } 134 135 @Override 136 public MultiResponse multi(RpcController controller, MultiRequest request) 137 throws ServiceException { 138 /** 139 * Here is for multiGet 140 */ 141 return this.doCall(controller, request, (c, r) -> { 142 return target.multi(c, r); 143 }); 144 } 145 146 @Override 147 public ScanResponse scan(RpcController controller, ScanRequest request) 148 throws ServiceException { 149 return this.doCall(controller, request, (c, r) -> { 150 return target.scan(c, r); 151 }); 152 } 153 154 interface Operation<REQUEST, RESPONSE> { 155 RESPONSE call(RpcController controller, REQUEST request) throws ServiceException; 156 } 157 158 private <REQUEST, RESPONSE> RESPONSE doCall(RpcController controller, REQUEST request, 159 Operation<REQUEST, RESPONSE> operation) throws ServiceException { 160 Optional<RpcCall> rpcCallOptional = RpcServer.unsetCurrentCall(); 161 try { 162 return operation.call(controller, request); 163 } finally { 164 rpcCallOptional.ifPresent(RpcServer::setCurrentCall); 165 } 166 } 167 168 @Override 169 public MutateResponse mutate(RpcController controller, MutateRequest request) 170 throws ServiceException { 171 return target.mutate(controller, request); 172 } 173 174 @Override 175 public BulkLoadHFileResponse bulkLoadHFile(RpcController controller, 176 BulkLoadHFileRequest request) throws ServiceException { 177 return target.bulkLoadHFile(controller, request); 178 } 179 180 @Override 181 public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller, 182 PrepareBulkLoadRequest request) throws ServiceException { 183 return target.prepareBulkLoad(controller, request); 184 } 185 186 @Override 187 public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller, 188 CleanupBulkLoadRequest request) throws ServiceException { 189 return target.cleanupBulkLoad(controller, request); 190 } 191 192 @Override 193 public CoprocessorServiceResponse execService(RpcController controller, 194 CoprocessorServiceRequest request) throws ServiceException { 195 return target.execService(controller, request); 196 } 197 198 @Override 199 public CoprocessorServiceResponse execRegionServerService(RpcController controller, 200 CoprocessorServiceRequest request) throws ServiceException { 201 return target.execRegionServerService(controller, request); 202 } 203 } 204 } 205 206 /** 207 * Creates a short-circuit connection that can bypass the RPC layer (serialization, 208 * deserialization, networking, etc..) when talking to a local server. 209 * @param conf the current configuration 210 * @param user the user the connection is for 211 * @param serverName the local server name 212 * @param admin the admin interface of the local server 213 * @param client the client interface of the local server 214 * @param registry the connection registry to be used, can be null 215 * @return an short-circuit connection. 216 * @throws IOException if IO failure occurred 217 */ 218 public static ClusterConnection createShortCircuitConnection(final Configuration conf, User user, 219 final ServerName serverName, final AdminService.BlockingInterface admin, 220 final ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException { 221 if (user == null) { 222 user = UserProvider.instantiate(conf).getCurrent(); 223 } 224 return new ShortCircuitingClusterConnection(conf, user, serverName, admin, client, registry); 225 } 226 227}