001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.client;
019
020import java.io.IOException;
021import java.util.Optional;
022import org.apache.hadoop.conf.Configuration;
023import org.apache.hadoop.hbase.ServerName;
024import org.apache.hadoop.hbase.coprocessor.RegionObserver;
025import org.apache.hadoop.hbase.ipc.RpcCall;
026import org.apache.hadoop.hbase.ipc.RpcServer;
027import org.apache.hadoop.hbase.ipc.ServerCall;
028import org.apache.hadoop.hbase.regionserver.RSRpcServices;
029import org.apache.hadoop.hbase.regionserver.RegionScanner;
030import org.apache.hadoop.hbase.security.User;
031import org.apache.hadoop.hbase.security.UserProvider;
032import org.apache.yetus.audience.InterfaceAudience;
033
034import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
035import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
036
037import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
038import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
039import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
040import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
041import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
045import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
046import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse;
047import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
048import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiResponse;
049import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
050import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse;
051import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
052import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
053import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
054import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
055import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
056
057@InterfaceAudience.Private
058public final class ServerConnectionUtils {
059
060  private ServerConnectionUtils() {
061  }
062
063  /**
064   * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
065   * if the invocation target is 'this' server; save on network and protobuf invocations.
066   */
067  // TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
068  // Class is visible so can assert we are short-circuiting when expected.
069  public final static class ShortCircuitingClusterConnection extends ConnectionImplementation {
070    private final ServerName serverName;
071    private final AdminService.BlockingInterface localHostAdmin;
072    private final ClientService.BlockingInterface localHostClient;
073    private final ClientService.BlockingInterface localClientServiceBlockingInterfaceWrapper;
074
075    private ShortCircuitingClusterConnection(Configuration conf, User user, ServerName serverName,
076      AdminService.BlockingInterface admin, ClientService.BlockingInterface client,
077      ConnectionRegistry registry) throws IOException {
078      super(conf, null, user, registry);
079      this.serverName = serverName;
080      this.localHostAdmin = admin;
081      this.localHostClient = client;
082      this.localClientServiceBlockingInterfaceWrapper =
083        new ClientServiceBlockingInterfaceWrapper(this.localHostClient);
084    }
085
086    @Override
087    public AdminService.BlockingInterface getAdmin(ServerName sn) throws IOException {
088      return serverName.equals(sn) ? this.localHostAdmin : super.getAdmin(sn);
089    }
090
091    @Override
092    public ClientService.BlockingInterface getClient(ServerName sn) throws IOException {
093      return serverName.equals(sn)
094        ? this.localClientServiceBlockingInterfaceWrapper
095        : super.getClient(sn);
096    }
097
098    @Override
099    public MasterKeepAliveConnection getMaster() throws IOException {
100      if (this.localHostClient instanceof MasterService.BlockingInterface) {
101        return new ShortCircuitMasterConnection(
102          (MasterService.BlockingInterface) this.localHostClient);
103      }
104      return super.getMaster();
105    }
106
107    /**
108     * When we directly invoke {@link RSRpcServices#get} on the same RegionServer through
109     * {@link ShortCircuitingClusterConnection} in region CPs such as
110     * {@link RegionObserver#postScannerOpen} to get other rows, the {@link RegionScanner} created
111     * for the directly {@link RSRpcServices#get} may not be closed until the outmost rpc call is
112     * completed if there is an outmost {@link RpcCall}, and even worse , the
113     * {@link ServerCall#rpcCallback} may be override which would cause serious problem,so for
114     * {@link ShortCircuitingClusterConnection#getClient}, if return
115     * {@link ShortCircuitingClusterConnection#localHostClient},we would add a wrapper class to wrap
116     * it , which using {@link RpcServer#unsetCurrentCall} and {RpcServer#setCurrentCall} to
117     * surround the scan and get method call,so the {@link RegionScanner} created for the directly
118     * {@link RSRpcServices#get} could be closed immediately,see HBASE-26812 for more.
119     */
120    static class ClientServiceBlockingInterfaceWrapper implements ClientService.BlockingInterface {
121
122      private ClientService.BlockingInterface target;
123
124      ClientServiceBlockingInterfaceWrapper(ClientService.BlockingInterface target) {
125        this.target = target;
126      }
127
128      @Override
129      public GetResponse get(RpcController controller, GetRequest request) throws ServiceException {
130        return this.doCall(controller, request, (c, r) -> {
131          return target.get(c, r);
132        });
133      }
134
135      @Override
136      public MultiResponse multi(RpcController controller, MultiRequest request)
137        throws ServiceException {
138        /**
139         * Here is for multiGet
140         */
141        return this.doCall(controller, request, (c, r) -> {
142          return target.multi(c, r);
143        });
144      }
145
146      @Override
147      public ScanResponse scan(RpcController controller, ScanRequest request)
148        throws ServiceException {
149        return this.doCall(controller, request, (c, r) -> {
150          return target.scan(c, r);
151        });
152      }
153
154      interface Operation<REQUEST, RESPONSE> {
155        RESPONSE call(RpcController controller, REQUEST request) throws ServiceException;
156      }
157
158      private <REQUEST, RESPONSE> RESPONSE doCall(RpcController controller, REQUEST request,
159        Operation<REQUEST, RESPONSE> operation) throws ServiceException {
160        Optional<RpcCall> rpcCallOptional = RpcServer.unsetCurrentCall();
161        try {
162          return operation.call(controller, request);
163        } finally {
164          rpcCallOptional.ifPresent(RpcServer::setCurrentCall);
165        }
166      }
167
168      @Override
169      public MutateResponse mutate(RpcController controller, MutateRequest request)
170        throws ServiceException {
171        return target.mutate(controller, request);
172      }
173
174      @Override
175      public BulkLoadHFileResponse bulkLoadHFile(RpcController controller,
176        BulkLoadHFileRequest request) throws ServiceException {
177        return target.bulkLoadHFile(controller, request);
178      }
179
180      @Override
181      public PrepareBulkLoadResponse prepareBulkLoad(RpcController controller,
182        PrepareBulkLoadRequest request) throws ServiceException {
183        return target.prepareBulkLoad(controller, request);
184      }
185
186      @Override
187      public CleanupBulkLoadResponse cleanupBulkLoad(RpcController controller,
188        CleanupBulkLoadRequest request) throws ServiceException {
189        return target.cleanupBulkLoad(controller, request);
190      }
191
192      @Override
193      public CoprocessorServiceResponse execService(RpcController controller,
194        CoprocessorServiceRequest request) throws ServiceException {
195        return target.execService(controller, request);
196      }
197
198      @Override
199      public CoprocessorServiceResponse execRegionServerService(RpcController controller,
200        CoprocessorServiceRequest request) throws ServiceException {
201        return target.execRegionServerService(controller, request);
202      }
203    }
204  }
205
206  /**
207   * Creates a short-circuit connection that can bypass the RPC layer (serialization,
208   * deserialization, networking, etc..) when talking to a local server.
209   * @param conf       the current configuration
210   * @param user       the user the connection is for
211   * @param serverName the local server name
212   * @param admin      the admin interface of the local server
213   * @param client     the client interface of the local server
214   * @param registry   the connection registry to be used, can be null
215   * @return an short-circuit connection.
216   * @throws IOException if IO failure occurred
217   */
218  public static ClusterConnection createShortCircuitConnection(final Configuration conf, User user,
219    final ServerName serverName, final AdminService.BlockingInterface admin,
220    final ClientService.BlockingInterface client, ConnectionRegistry registry) throws IOException {
221    if (user == null) {
222      user = UserProvider.instantiate(conf).getCurrent();
223    }
224    return new ShortCircuitingClusterConnection(conf, user, serverName, admin, client, registry);
225  }
226
227}