001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.ipc; 019 020import static org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME; 021 022import com.google.protobuf.Descriptors; 023import com.google.protobuf.Descriptors.MethodDescriptor; 024import com.google.protobuf.Descriptors.ServiceDescriptor; 025import com.google.protobuf.Message; 026import com.google.protobuf.RpcCallback; 027import com.google.protobuf.RpcController; 028import com.google.protobuf.Service; 029import edu.umd.cs.findbugs.annotations.Nullable; 030import java.io.IOException; 031import java.io.InterruptedIOException; 032import org.apache.hadoop.hbase.DoNotRetryIOException; 033import org.apache.hadoop.hbase.HConstants; 034import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; 035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; 036import org.apache.hadoop.util.StringUtils; 037import org.apache.yetus.audience.InterfaceAudience; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; 042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall; 043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest; 044import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; 045 046/** 047 * Utilities for handling coprocessor rpc service calls. 048 */ 049@InterfaceAudience.Private 050public final class CoprocessorRpcUtils { 051 private static final Logger LOG = LoggerFactory.getLogger(CoprocessorRpcUtils.class); 052 /** 053 * We assume that all HBase protobuf services share a common package name (defined in the .proto 054 * files). 055 */ 056 private static final String hbaseServicePackage; 057 static { 058 Descriptors.ServiceDescriptor clientService = ClientProtos.ClientService.getDescriptor(); 059 hbaseServicePackage = clientService.getFullName().substring(0, 060 clientService.getFullName().lastIndexOf(clientService.getName())); 061 } 062 063 private CoprocessorRpcUtils() { 064 // private for utility class 065 } 066 067 /** 068 * Returns the name to use for coprocessor service calls. For core HBase services (in the hbase.pb 069 * protobuf package), this returns the unqualified name in order to provide backward compatibility 070 * across the package name change. For all other services, the fully-qualified service name is 071 * used. 072 */ 073 public static String getServiceName(Descriptors.ServiceDescriptor service) { 074 if (service.getFullName().startsWith(hbaseServicePackage)) { 075 return service.getName(); 076 } 077 return service.getFullName(); 078 } 079 080 public static CoprocessorServiceRequest 081 getCoprocessorServiceRequest(final Descriptors.MethodDescriptor method, final Message request) { 082 return getCoprocessorServiceRequest(method, request, HConstants.EMPTY_BYTE_ARRAY, 083 HConstants.EMPTY_BYTE_ARRAY); 084 } 085 086 public static CoprocessorServiceRequest getCoprocessorServiceRequest( 087 final Descriptors.MethodDescriptor method, final Message request, final byte[] row, 088 final byte[] regionName) { 089 return CoprocessorServiceRequest.newBuilder() 090 .setCall(getCoprocessorServiceCall(method, request, row)) 091 .setRegion(RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build(); 092 } 093 094 private static CoprocessorServiceCall getCoprocessorServiceCall( 095 final Descriptors.MethodDescriptor method, final Message request, final byte[] row) { 096 return CoprocessorServiceCall.newBuilder() 097 .setRow(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.unsafeWrap(row)) 098 .setServiceName(CoprocessorRpcUtils.getServiceName(method.getService())) 099 .setMethodName(method.getName()) 100 // TODO!!!!! Come back here after!!!!! This is a double copy of the request if I read 101 // it right copying from non-shaded to shaded version!!!!!! FIXXXXX!!!!! 102 .setRequest(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations 103 .unsafeWrap(request.toByteArray())) 104 .build(); 105 } 106 107 public static MethodDescriptor getMethodDescriptor(final String methodName, 108 final ServiceDescriptor serviceDesc) throws UnknownProtocolException { 109 Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName); 110 if (methodDesc == null) { 111 throw new UnknownProtocolException( 112 "Unknown method " + methodName + " called on service " + serviceDesc.getFullName()); 113 } 114 return methodDesc; 115 } 116 117 public static Message getRequest(Service service, Descriptors.MethodDescriptor methodDesc, 118 org.apache.hbase.thirdparty.com.google.protobuf.ByteString shadedRequest) throws IOException { 119 Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType(); 120 org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builderForType, 121 // TODO: COPY FROM SHADED TO NON_SHADED. DO I HAVE TOO? 122 shadedRequest.toByteArray()); 123 return builderForType.build(); 124 } 125 126 public static Message getResponse( 127 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse result, 128 com.google.protobuf.Message responsePrototype) throws IOException { 129 Message response; 130 if (result.getValue().hasValue()) { 131 Message.Builder builder = responsePrototype.newBuilderForType(); 132 builder.mergeFrom(result.getValue().getValue().newInput()); 133 response = builder.build(); 134 } else { 135 response = responsePrototype.getDefaultInstanceForType(); 136 } 137 if (LOG.isTraceEnabled()) { 138 LOG.trace("Master Result is value=" + response); 139 } 140 return response; 141 } 142 143 public static 144 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse 145 getResponse(final Message result, final byte[] regionName) { 146 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse.Builder builder = 147 org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse 148 .newBuilder(); 149 builder.setRegion( 150 RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName)); 151 // TODO: UGLY COPY IN HERE!!!! 152 builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue( 153 org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray()))); 154 return builder.build(); 155 } 156 157 /** 158 * Simple {@link RpcCallback} implementation providing a {@link java.util.concurrent.Future}-like 159 * {@link BlockingRpcCallback#get()} method, which will block util the instance's 160 * {@link BlockingRpcCallback#run(Object)} method has been called. {@code R} is the RPC response 161 * type that will be passed to the {@link #run(Object)} method. 162 */ 163 @InterfaceAudience.Private 164 // Copy of BlockingRpcCallback but deriving from RpcCallback non-shaded. 165 public static class BlockingRpcCallback<R> implements RpcCallback<R> { 166 private R result; 167 private boolean resultSet = false; 168 169 /** 170 * Called on completion of the RPC call with the response object, or {@code null} in the case of 171 * an error. 172 * @param parameter the response object or {@code null} if an error occurred 173 */ 174 @Override 175 public void run(R parameter) { 176 synchronized (this) { 177 result = parameter; 178 resultSet = true; 179 this.notifyAll(); 180 } 181 } 182 183 /** 184 * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was 185 * passed. When used asynchronously, this method will block until the {@link #run(Object)} 186 * method has been called. 187 * @return the response object or {@code null} if no response was passed 188 */ 189 public synchronized R get() throws IOException { 190 while (!resultSet) { 191 try { 192 this.wait(); 193 } catch (InterruptedException ie) { 194 InterruptedIOException exception = new InterruptedIOException(ie.getMessage()); 195 exception.initCause(ie); 196 throw exception; 197 } 198 } 199 return result; 200 } 201 } 202 203 /** 204 * Stores an exception encountered during RPC invocation so it can be passed back through to the 205 * client. 206 * @param controller the controller instance provided by the client when calling the service 207 * @param ioe the exception encountered 208 */ 209 public static void setControllerException(RpcController controller, IOException ioe) { 210 if (controller == null) { 211 return; 212 } 213 if (controller instanceof org.apache.hadoop.hbase.ipc.ServerRpcController) { 214 ((ServerRpcController) controller).setFailedOn(ioe); 215 } else { 216 controller.setFailed(StringUtils.stringifyException(ioe)); 217 } 218 } 219 220 /** 221 * Retreivies exception stored during RPC invocation. 222 * @param controller the controller instance provided by the client when calling the service 223 * @return exception if any, or null; Will return DoNotRetryIOException for string represented 224 * failure causes in controller. 225 */ 226 @Nullable 227 public static IOException getControllerException(RpcController controller) throws IOException { 228 if (controller == null || !controller.failed()) { 229 return null; 230 } 231 if (controller instanceof ServerRpcController) { 232 return ((ServerRpcController) controller).getFailedOn(); 233 } 234 return new DoNotRetryIOException(controller.errorText()); 235 } 236}