001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellScanner; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 031import org.apache.hadoop.hbase.util.Threads; 032import org.apache.yetus.audience.InterfaceAudience; 033 034import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 035import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 036import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 037 038import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; 039import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 040import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 041import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 042import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 047 048@InterfaceAudience.Private 049public class TestProtobufRpcServiceImpl implements BlockingInterface { 050 051 public static final BlockingService SERVICE = 052 TestProtobufRpcProto.newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); 053 054 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) 055 throws IOException { 056 return newBlockingStub(client, addr, User.getCurrent()); 057 } 058 059 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, 060 User user) throws IOException { 061 return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( 062 ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()), 063 user, 0)); 064 } 065 066 public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { 067 return TestProtobufRpcProto.newStub(client.createRpcChannel( 068 ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()), 069 User.getCurrent(), 0)); 070 } 071 072 @Override 073 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) 074 throws ServiceException { 075 return EmptyResponseProto.getDefaultInstance(); 076 } 077 078 @Override 079 public EchoResponseProto echo(RpcController controller, EchoRequestProto request) 080 throws ServiceException { 081 if (controller instanceof HBaseRpcController) { 082 HBaseRpcController pcrc = (HBaseRpcController) controller; 083 // If cells, scan them to check we are able to iterate what we were given and since this is an 084 // echo, just put them back on the controller creating a new block. Tests our block building. 085 CellScanner cellScanner = pcrc.cellScanner(); 086 List<Cell> list = null; 087 if (cellScanner != null) { 088 list = new ArrayList<>(); 089 try { 090 while (cellScanner.advance()) { 091 list.add(cellScanner.current()); 092 } 093 } catch (IOException e) { 094 throw new ServiceException(e); 095 } 096 } 097 cellScanner = CellUtil.createCellScanner(list); 098 pcrc.setCellScanner(cellScanner); 099 } 100 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); 101 } 102 103 @Override 104 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) 105 throws ServiceException { 106 throw new ServiceException(new DoNotRetryIOException("server error!")); 107 } 108 109 @Override 110 public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) 111 throws ServiceException { 112 Threads.sleepWithoutInterrupt(request.getMs()); 113 return EmptyResponseProto.getDefaultInstance(); 114 } 115 116 @Override 117 public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) 118 throws ServiceException { 119 return AddrResponseProto.newBuilder() 120 .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build(); 121 } 122}