001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.util.ArrayList; 023import java.util.List; 024import org.apache.hadoop.hbase.Cell; 025import org.apache.hadoop.hbase.CellScanner; 026import org.apache.hadoop.hbase.CellUtil; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.ServerName; 029import org.apache.hadoop.hbase.security.User; 030import org.apache.hadoop.hbase.util.Threads; 031import org.apache.yetus.audience.InterfaceAudience; 032 033import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 034import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 035import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 036 037import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; 038import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 039import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 040import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 041import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 042import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 046 047@InterfaceAudience.Private 048public class TestProtobufRpcServiceImpl implements BlockingInterface { 049 050 public static final BlockingService SERVICE = TestProtobufRpcProto 051 .newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); 052 053 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) 054 throws IOException { 055 return newBlockingStub(client, addr, User.getCurrent()); 056 } 057 058 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, 059 User user) throws IOException { 060 return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( 061 ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), user, 0)); 062 } 063 064 public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { 065 return TestProtobufRpcProto.newStub(client.createRpcChannel( 066 ServerName.valueOf(addr.getHostName(), addr.getPort(), System.currentTimeMillis()), 067 User.getCurrent(), 0)); 068 } 069 070 @Override 071 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) 072 throws ServiceException { 073 return EmptyResponseProto.getDefaultInstance(); 074 } 075 076 @Override 077 public EchoResponseProto echo(RpcController controller, EchoRequestProto request) 078 throws ServiceException { 079 if (controller instanceof HBaseRpcController) { 080 HBaseRpcController pcrc = (HBaseRpcController) controller; 081 // If cells, scan them to check we are able to iterate what we were given and since this is an 082 // echo, just put them back on the controller creating a new block. Tests our block building. 083 CellScanner cellScanner = pcrc.cellScanner(); 084 List<Cell> list = null; 085 if (cellScanner != null) { 086 list = new ArrayList<>(); 087 try { 088 while (cellScanner.advance()) { 089 list.add(cellScanner.current()); 090 } 091 } catch (IOException e) { 092 throw new ServiceException(e); 093 } 094 } 095 cellScanner = CellUtil.createCellScanner(list); 096 pcrc.setCellScanner(cellScanner); 097 } 098 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); 099 } 100 101 @Override 102 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) 103 throws ServiceException { 104 throw new ServiceException(new DoNotRetryIOException("server error!")); 105 } 106 107 @Override 108 public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) 109 throws ServiceException { 110 Threads.sleepWithoutInterrupt(request.getMs()); 111 return EmptyResponseProto.getDefaultInstance(); 112 } 113 114 @Override 115 public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) 116 throws ServiceException { 117 return AddrResponseProto.newBuilder() 118 .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build(); 119 } 120}