001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 */
019
020package org.apache.hadoop.hbase.ipc;
021
022import static org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
023import java.io.IOException;
024import java.io.InterruptedIOException;
025
026import org.apache.hadoop.hbase.DoNotRetryIOException;
027import org.apache.hadoop.hbase.HConstants;
028import org.apache.yetus.audience.InterfaceAudience;
029import org.slf4j.Logger;
030import org.slf4j.LoggerFactory;
031import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
032import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
033import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
034import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
035import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
036import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
037import org.apache.hadoop.util.StringUtils;
038
039import com.google.protobuf.RpcCallback;
040import com.google.protobuf.RpcController;
041import com.google.protobuf.Descriptors;
042import com.google.protobuf.Descriptors.MethodDescriptor;
043import com.google.protobuf.Descriptors.ServiceDescriptor;
044import com.google.protobuf.Message;
045import com.google.protobuf.Service;
046
047import edu.umd.cs.findbugs.annotations.Nullable;
048
049/**
050 * Utilities for handling coprocessor rpc service calls.
051 */
052@InterfaceAudience.Private
053public final class CoprocessorRpcUtils {
054  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorRpcUtils.class);
055  /**
056   * We assume that all HBase protobuf services share a common package name
057   * (defined in the .proto files).
058   */
059  private static final String hbaseServicePackage;
060  static {
061    Descriptors.ServiceDescriptor clientService = ClientProtos.ClientService.getDescriptor();
062    hbaseServicePackage = clientService.getFullName()
063        .substring(0, clientService.getFullName().lastIndexOf(clientService.getName()));
064  }
065
066  private CoprocessorRpcUtils() {
067    // private for utility class
068  }
069
070  /**
071   * Returns the name to use for coprocessor service calls.  For core HBase services
072   * (in the hbase.pb protobuf package), this returns the unqualified name in order to provide
073   * backward compatibility across the package name change.  For all other services,
074   * the fully-qualified service name is used.
075   */
076  public static String getServiceName(Descriptors.ServiceDescriptor service) {
077    if (service.getFullName().startsWith(hbaseServicePackage)) {
078      return service.getName();
079    }
080    return service.getFullName();
081  }
082
083  public static CoprocessorServiceRequest getCoprocessorServiceRequest(
084      final Descriptors.MethodDescriptor method, final Message request) {
085    return getCoprocessorServiceRequest(method, request, HConstants.EMPTY_BYTE_ARRAY,
086        HConstants.EMPTY_BYTE_ARRAY);
087  }
088
089  public static CoprocessorServiceRequest getCoprocessorServiceRequest(
090      final Descriptors.MethodDescriptor method, final Message request, final byte [] row,
091      final byte [] regionName) {
092    return CoprocessorServiceRequest.newBuilder().setCall(
093        getCoprocessorServiceCall(method, request, row)).
094          setRegion(RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build();
095  }
096
097  private static CoprocessorServiceCall getCoprocessorServiceCall(
098      final Descriptors.MethodDescriptor method, final Message request, final byte [] row) {
099    return CoprocessorServiceCall.newBuilder()
100    .setRow(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.unsafeWrap(row))
101    .setServiceName(CoprocessorRpcUtils.getServiceName(method.getService()))
102    .setMethodName(method.getName())
103    // TODO!!!!! Come back here after!!!!! This is a double copy of the request if I read
104    // it right copying from non-shaded to shaded version!!!!!! FIXXXXX!!!!!
105    .setRequest(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.
106        unsafeWrap(request.toByteArray())).build();
107  }
108
109  public static MethodDescriptor getMethodDescriptor(final String methodName,
110      final ServiceDescriptor serviceDesc)
111  throws UnknownProtocolException {
112    Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
113    if (methodDesc == null) {
114      throw new UnknownProtocolException("Unknown method " + methodName + " called on service " +
115          serviceDesc.getFullName());
116    }
117    return methodDesc;
118  }
119
120  public static Message getRequest(Service service,
121      Descriptors.MethodDescriptor methodDesc,
122      org.apache.hbase.thirdparty.com.google.protobuf.ByteString shadedRequest)
123  throws IOException {
124    Message.Builder builderForType =
125        service.getRequestPrototype(methodDesc).newBuilderForType();
126    org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builderForType,
127        // TODO: COPY FROM SHADED TO NON_SHADED. DO I HAVE TOO?
128        shadedRequest.toByteArray());
129    return builderForType.build();
130  }
131
132  public static Message getResponse(
133      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse
134        result,
135      com.google.protobuf.Message responsePrototype)
136  throws IOException {
137    Message response;
138    if (result.getValue().hasValue()) {
139      Message.Builder builder = responsePrototype.newBuilderForType();
140      builder.mergeFrom(result.getValue().getValue().newInput());
141      response = builder.build();
142    } else {
143      response = responsePrototype.getDefaultInstanceForType();
144    }
145    if (LOG.isTraceEnabled()) {
146      LOG.trace("Master Result is value=" + response);
147    }
148    return response;
149  }
150
151  public static org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.
152      CoprocessorServiceResponse getResponse(final Message result, final byte [] regionName) {
153    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.
154      CoprocessorServiceResponse.Builder builder =
155        org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse.
156        newBuilder();
157    builder.setRegion(RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME,
158      regionName));
159    // TODO: UGLY COPY IN HERE!!!!
160    builder.setValue(builder.getValueBuilder().setName(result.getClass().getName())
161        .setValue(org.apache.hbase.thirdparty.com.google.protobuf.ByteString.
162            copyFrom(result.toByteArray())));
163    return builder.build();
164  }
165
166  /**
167   * Simple {@link RpcCallback} implementation providing a
168   * {@link java.util.concurrent.Future}-like {@link BlockingRpcCallback#get()} method, which
169   * will block util the instance's {@link BlockingRpcCallback#run(Object)} method has been called.
170   * {@code R} is the RPC response type that will be passed to the {@link #run(Object)} method.
171   */
172  @InterfaceAudience.Private
173  // Copy of BlockingRpcCallback but deriving from RpcCallback non-shaded.
174  public static class BlockingRpcCallback<R> implements RpcCallback<R> {
175    private R result;
176    private boolean resultSet = false;
177
178    /**
179     * Called on completion of the RPC call with the response object, or {@code null} in the case of
180     * an error.
181     * @param parameter the response object or {@code null} if an error occurred
182     */
183    @Override
184    public void run(R parameter) {
185      synchronized (this) {
186        result = parameter;
187        resultSet = true;
188        this.notifyAll();
189      }
190    }
191
192    /**
193     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
194     * passed.  When used asynchronously, this method will block until the {@link #run(Object)}
195     * method has been called.
196     * @return the response object or {@code null} if no response was passed
197     */
198    public synchronized R get() throws IOException {
199      while (!resultSet) {
200        try {
201          this.wait();
202        } catch (InterruptedException ie) {
203          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
204          exception.initCause(ie);
205          throw exception;
206        }
207      }
208      return result;
209    }
210  }
211
212  /**
213   * Stores an exception encountered during RPC invocation so it can be passed back
214   * through to the client.
215   * @param controller the controller instance provided by the client when calling the service
216   * @param ioe the exception encountered
217   */
218  public static void setControllerException(RpcController controller, IOException ioe) {
219    if (controller == null) {
220      return;
221    }
222    if (controller instanceof org.apache.hadoop.hbase.ipc.ServerRpcController) {
223      ((ServerRpcController)controller).setFailedOn(ioe);
224    } else {
225      controller.setFailed(StringUtils.stringifyException(ioe));
226    }
227  }
228
229  /**
230   * Retreivies exception stored during RPC invocation.
231   * @param controller the controller instance provided by the client when calling the service
232   * @return exception if any, or null; Will return DoNotRetryIOException for string represented
233   * failure causes in controller.
234   */
235  @Nullable
236  public static IOException getControllerException(RpcController controller) throws IOException {
237    if (controller == null || !controller.failed()) {
238      return null;
239    }
240    if (controller instanceof ServerRpcController) {
241      return ((ServerRpcController)controller).getFailedOn();
242    }
243    return new DoNotRetryIOException(controller.errorText());
244  }
245}