001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.ipc;
019
020import static org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME;
021
022import com.google.protobuf.Descriptors;
023import com.google.protobuf.Descriptors.MethodDescriptor;
024import com.google.protobuf.Descriptors.ServiceDescriptor;
025import com.google.protobuf.Message;
026import com.google.protobuf.RpcCallback;
027import com.google.protobuf.RpcController;
028import com.google.protobuf.Service;
029import edu.umd.cs.findbugs.annotations.Nullable;
030import java.io.IOException;
031import java.io.InterruptedIOException;
032import org.apache.hadoop.hbase.DoNotRetryIOException;
033import org.apache.hadoop.hbase.HConstants;
034import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
035import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
036import org.apache.hadoop.util.StringUtils;
037import org.apache.yetus.audience.InterfaceAudience;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
042import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
043import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
044import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
045
046/**
047 * Utilities for handling coprocessor rpc service calls.
048 */
049@InterfaceAudience.Private
050public final class CoprocessorRpcUtils {
051  private static final Logger LOG = LoggerFactory.getLogger(CoprocessorRpcUtils.class);
052  /**
053   * We assume that all HBase protobuf services share a common package name (defined in the .proto
054   * files).
055   */
056  private static final String hbaseServicePackage;
057  static {
058    Descriptors.ServiceDescriptor clientService = ClientProtos.ClientService.getDescriptor();
059    hbaseServicePackage = clientService.getFullName().substring(0,
060      clientService.getFullName().lastIndexOf(clientService.getName()));
061  }
062
063  private CoprocessorRpcUtils() {
064    // private for utility class
065  }
066
067  /**
068   * Returns the name to use for coprocessor service calls. For core HBase services (in the hbase.pb
069   * protobuf package), this returns the unqualified name in order to provide backward compatibility
070   * across the package name change. For all other services, the fully-qualified service name is
071   * used.
072   */
073  public static String getServiceName(Descriptors.ServiceDescriptor service) {
074    if (service.getFullName().startsWith(hbaseServicePackage)) {
075      return service.getName();
076    }
077    return service.getFullName();
078  }
079
080  public static CoprocessorServiceRequest
081    getCoprocessorServiceRequest(final Descriptors.MethodDescriptor method, final Message request) {
082    return getCoprocessorServiceRequest(method, request, HConstants.EMPTY_BYTE_ARRAY,
083      HConstants.EMPTY_BYTE_ARRAY);
084  }
085
086  public static CoprocessorServiceRequest getCoprocessorServiceRequest(
087    final Descriptors.MethodDescriptor method, final Message request, final byte[] row,
088    final byte[] regionName) {
089    return CoprocessorServiceRequest.newBuilder()
090      .setCall(getCoprocessorServiceCall(method, request, row))
091      .setRegion(RequestConverter.buildRegionSpecifier(REGION_NAME, regionName)).build();
092  }
093
094  private static CoprocessorServiceCall getCoprocessorServiceCall(
095    final Descriptors.MethodDescriptor method, final Message request, final byte[] row) {
096    return CoprocessorServiceCall.newBuilder()
097      .setRow(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations.unsafeWrap(row))
098      .setServiceName(CoprocessorRpcUtils.getServiceName(method.getService()))
099      .setMethodName(method.getName())
100      // TODO!!!!! Come back here after!!!!! This is a double copy of the request if I read
101      // it right copying from non-shaded to shaded version!!!!!! FIXXXXX!!!!!
102      .setRequest(org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations
103        .unsafeWrap(request.toByteArray()))
104      .build();
105  }
106
107  public static MethodDescriptor getMethodDescriptor(final String methodName,
108    final ServiceDescriptor serviceDesc) throws UnknownProtocolException {
109    Descriptors.MethodDescriptor methodDesc = serviceDesc.findMethodByName(methodName);
110    if (methodDesc == null) {
111      throw new UnknownProtocolException(
112        "Unknown method " + methodName + " called on service " + serviceDesc.getFullName());
113    }
114    return methodDesc;
115  }
116
117  public static Message getRequest(Service service, Descriptors.MethodDescriptor methodDesc,
118    org.apache.hbase.thirdparty.com.google.protobuf.ByteString shadedRequest) throws IOException {
119    Message.Builder builderForType = service.getRequestPrototype(methodDesc).newBuilderForType();
120    org.apache.hadoop.hbase.protobuf.ProtobufUtil.mergeFrom(builderForType,
121      // TODO: COPY FROM SHADED TO NON_SHADED. DO I HAVE TOO?
122      shadedRequest.toByteArray());
123    return builderForType.build();
124  }
125
126  public static Message getResponse(
127    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse result,
128    com.google.protobuf.Message responsePrototype) throws IOException {
129    Message response;
130    if (result.getValue().hasValue()) {
131      Message.Builder builder = responsePrototype.newBuilderForType();
132      builder.mergeFrom(result.getValue().getValue().newInput());
133      response = builder.build();
134    } else {
135      response = responsePrototype.getDefaultInstanceForType();
136    }
137    if (LOG.isTraceEnabled()) {
138      LOG.trace("Master Result is value=" + response);
139    }
140    return response;
141  }
142
143  public static
144    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse
145    getResponse(final Message result, final byte[] regionName) {
146    org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse.Builder builder =
147      org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceResponse
148        .newBuilder();
149    builder.setRegion(
150      RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName));
151    // TODO: UGLY COPY IN HERE!!!!
152    builder.setValue(builder.getValueBuilder().setName(result.getClass().getName()).setValue(
153      org.apache.hbase.thirdparty.com.google.protobuf.ByteString.copyFrom(result.toByteArray())));
154    return builder.build();
155  }
156
157  /**
158   * Simple {@link RpcCallback} implementation providing a {@link java.util.concurrent.Future}-like
159   * {@link BlockingRpcCallback#get()} method, which will block util the instance's
160   * {@link BlockingRpcCallback#run(Object)} method has been called. {@code R} is the RPC response
161   * type that will be passed to the {@link #run(Object)} method.
162   */
163  @InterfaceAudience.Private
164  // Copy of BlockingRpcCallback but deriving from RpcCallback non-shaded.
165  public static class BlockingRpcCallback<R> implements RpcCallback<R> {
166    private R result;
167    private boolean resultSet = false;
168
169    /**
170     * Called on completion of the RPC call with the response object, or {@code null} in the case of
171     * an error.
172     * @param parameter the response object or {@code null} if an error occurred
173     */
174    @Override
175    public void run(R parameter) {
176      synchronized (this) {
177        result = parameter;
178        resultSet = true;
179        this.notifyAll();
180      }
181    }
182
183    /**
184     * Returns the parameter passed to {@link #run(Object)} or {@code null} if a null value was
185     * passed. When used asynchronously, this method will block until the {@link #run(Object)}
186     * method has been called.
187     * @return the response object or {@code null} if no response was passed
188     */
189    public synchronized R get() throws IOException {
190      while (!resultSet) {
191        try {
192          this.wait();
193        } catch (InterruptedException ie) {
194          InterruptedIOException exception = new InterruptedIOException(ie.getMessage());
195          exception.initCause(ie);
196          throw exception;
197        }
198      }
199      return result;
200    }
201  }
202
203  /**
204   * Stores an exception encountered during RPC invocation so it can be passed back through to the
205   * client.
206   * @param controller the controller instance provided by the client when calling the service
207   * @param ioe        the exception encountered
208   */
209  public static void setControllerException(RpcController controller, IOException ioe) {
210    if (controller == null) {
211      return;
212    }
213    if (controller instanceof org.apache.hadoop.hbase.ipc.ServerRpcController) {
214      ((ServerRpcController) controller).setFailedOn(ioe);
215    } else {
216      controller.setFailed(StringUtils.stringifyException(ioe));
217    }
218  }
219
220  /**
221   * Retreivies exception stored during RPC invocation.
222   * @param controller the controller instance provided by the client when calling the service
223   * @return exception if any, or null; Will return DoNotRetryIOException for string represented
224   *         failure causes in controller.
225   */
226  @Nullable
227  public static IOException getControllerException(RpcController controller) throws IOException {
228    if (controller == null || !controller.failed()) {
229      return null;
230    }
231    if (controller instanceof ServerRpcController) {
232      return ((ServerRpcController) controller).getFailedOn();
233    }
234    return new DoNotRetryIOException(controller.errorText());
235  }
236}