001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import java.io.IOException; 021import java.net.InetSocketAddress; 022import java.util.ArrayList; 023import java.util.List; 024import java.util.concurrent.Executors; 025import java.util.concurrent.ScheduledExecutorService; 026import java.util.concurrent.TimeUnit; 027import org.apache.hadoop.hbase.DoNotRetryIOException; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.ExtendedCellScanner; 030import org.apache.hadoop.hbase.PrivateCellUtil; 031import org.apache.hadoop.hbase.ServerName; 032import org.apache.hadoop.hbase.security.User; 033import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; 034import org.apache.hadoop.hbase.util.Threads; 035import org.apache.yetus.audience.InterfaceAudience; 036 037import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; 038import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService; 039import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; 040import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; 041import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; 042 043import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.AddrResponseProto; 044import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; 045import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto; 046import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto; 047import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyResponseProto; 048import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.PauseRequestProto; 049import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto; 050import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; 051import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; 052 053@InterfaceAudience.Private 054public class TestProtobufRpcServiceImpl implements BlockingInterface, Interface { 055 056 public static final BlockingService SERVICE = 057 TestProtobufRpcProto.newReflectiveBlockingService(new TestProtobufRpcServiceImpl()); 058 059 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr) 060 throws IOException { 061 return newBlockingStub(client, addr, User.getCurrent()); 062 } 063 064 public static BlockingInterface newBlockingStub(RpcClient client, InetSocketAddress addr, 065 User user) throws IOException { 066 return TestProtobufRpcProto.newBlockingStub(client.createBlockingRpcChannel( 067 ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()), 068 user, 0)); 069 } 070 071 public static Interface newStub(RpcClient client, InetSocketAddress addr) throws IOException { 072 return TestProtobufRpcProto.newStub(client.createRpcChannel( 073 ServerName.valueOf(addr.getHostName(), addr.getPort(), EnvironmentEdgeManager.currentTime()), 074 User.getCurrent(), 0)); 075 } 076 077 @Override 078 public EmptyResponseProto ping(RpcController controller, EmptyRequestProto request) 079 throws ServiceException { 080 return EmptyResponseProto.getDefaultInstance(); 081 } 082 083 @Override 084 public EchoResponseProto echo(RpcController controller, EchoRequestProto request) 085 throws ServiceException { 086 if (controller instanceof HBaseRpcController) { 087 HBaseRpcController pcrc = (HBaseRpcController) controller; 088 // If cells, scan them to check we are able to iterate what we were given and since this is an 089 // echo, just put them back on the controller creating a new block. Tests our block building. 090 ExtendedCellScanner cellScanner = pcrc.cellScanner(); 091 List<ExtendedCell> list = null; 092 if (cellScanner != null) { 093 list = new ArrayList<>(); 094 try { 095 while (cellScanner.advance()) { 096 list.add(cellScanner.current()); 097 } 098 } catch (IOException e) { 099 throw new ServiceException(e); 100 } 101 } 102 cellScanner = PrivateCellUtil.createExtendedCellScanner(list); 103 pcrc.setCellScanner(cellScanner); 104 } 105 return EchoResponseProto.newBuilder().setMessage(request.getMessage()).build(); 106 } 107 108 @Override 109 public EmptyResponseProto error(RpcController controller, EmptyRequestProto request) 110 throws ServiceException { 111 throw new ServiceException(new DoNotRetryIOException("server error!")); 112 } 113 114 @Override 115 public EmptyResponseProto pause(RpcController controller, PauseRequestProto request) 116 throws ServiceException { 117 Threads.sleepWithoutInterrupt(request.getMs()); 118 return EmptyResponseProto.getDefaultInstance(); 119 } 120 121 @Override 122 public AddrResponseProto addr(RpcController controller, EmptyRequestProto request) 123 throws ServiceException { 124 return AddrResponseProto.newBuilder() 125 .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build(); 126 } 127 128 @Override 129 public void ping(RpcController controller, EmptyRequestProto request, 130 RpcCallback<EmptyResponseProto> done) { 131 done.run(EmptyResponseProto.getDefaultInstance()); 132 } 133 134 @Override 135 public void echo(RpcController controller, EchoRequestProto request, 136 RpcCallback<EchoResponseProto> done) { 137 if (controller instanceof HBaseRpcController) { 138 HBaseRpcController pcrc = (HBaseRpcController) controller; 139 // If cells, scan them to check we are able to iterate what we were given and since this is an 140 // echo, just put them back on the controller creating a new block. Tests our block building. 141 ExtendedCellScanner cellScanner = pcrc.cellScanner(); 142 List<ExtendedCell> list = null; 143 if (cellScanner != null) { 144 list = new ArrayList<>(); 145 try { 146 while (cellScanner.advance()) { 147 list.add(cellScanner.current()); 148 } 149 } catch (IOException e) { 150 pcrc.setFailed(e); 151 return; 152 } 153 } 154 cellScanner = PrivateCellUtil.createExtendedCellScanner(list); 155 pcrc.setCellScanner(cellScanner); 156 } 157 done.run(EchoResponseProto.newBuilder().setMessage(request.getMessage()).build()); 158 } 159 160 @Override 161 public void error(RpcController controller, EmptyRequestProto request, 162 RpcCallback<EmptyResponseProto> done) { 163 if (controller instanceof HBaseRpcController) { 164 ((HBaseRpcController) controller).setFailed(new DoNotRetryIOException("server error!")); 165 } else { 166 controller.setFailed("server error!"); 167 } 168 } 169 170 private final ScheduledExecutorService executor = 171 Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build()); 172 173 @Override 174 public void pause(RpcController controller, PauseRequestProto request, 175 RpcCallback<EmptyResponseProto> done) { 176 executor.schedule(() -> done.run(EmptyResponseProto.getDefaultInstance()), request.getMs(), 177 TimeUnit.MILLISECONDS); 178 } 179 180 @Override 181 public void addr(RpcController controller, EmptyRequestProto request, 182 RpcCallback<AddrResponseProto> done) { 183 done.run(AddrResponseProto.newBuilder() 184 .setAddr(RpcServer.getRemoteAddress().get().getHostAddress()).build()); 185 } 186}