001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.coprocessor;
019
020import com.google.protobuf.ByteString;
021import com.google.protobuf.Message;
022import com.google.protobuf.RpcCallback;
023import com.google.protobuf.RpcController;
024import com.google.protobuf.Service;
025import java.io.IOException;
026import java.lang.reflect.InvocationTargetException;
027import java.lang.reflect.Method;
028import java.util.Collections;
029import org.apache.hadoop.hbase.CoprocessorEnvironment;
030import org.apache.hadoop.hbase.HBaseInterfaceAudience;
031import org.apache.hadoop.hbase.HConstants;
032import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
033import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
034import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
035import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
036import org.apache.hadoop.hbase.regionserver.Region;
037import org.apache.hadoop.hbase.regionserver.RowProcessor;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.apache.yetus.audience.InterfaceStability;
040
041/**
042 * This class demonstrates how to implement atomic read-modify-writes using
043 * {@link Region#processRowsWithLocks} and Coprocessor endpoints.
044 */
045@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
046@InterfaceStability.Evolving
047public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
048  extends RowProcessorService implements RegionCoprocessor {
049  private RegionCoprocessorEnvironment env;
050
051  /**
052   * Pass a processor to region to process multiple rows atomically. The RowProcessor
053   * implementations should be the inner classes of your RowProcessorEndpoint. This way the
054   * RowProcessor can be class-loaded with the Coprocessor endpoint together. See
055   * {@code TestRowProcessorEndpoint} for example. The request contains information for constructing
056   * processor (see {@link #constructRowProcessorFromRequest}. The processor object defines the
057   * read-modify-write procedure.
058   */
059  @Override
060  public void process(RpcController controller, ProcessRequest request,
061    RpcCallback<ProcessResponse> done) {
062    ProcessResponse resultProto = null;
063    try {
064      RowProcessor<S, T> processor = constructRowProcessorFromRequest(request);
065      Region region = env.getRegion();
066      long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
067      long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
068      region.processRowsWithLocks(processor, nonceGroup, nonce);
069      T result = processor.getResult();
070      ProcessResponse.Builder b = ProcessResponse.newBuilder();
071      b.setRowProcessorResult(result.toByteString());
072      resultProto = b.build();
073    } catch (Exception e) {
074      CoprocessorRpcUtils.setControllerException(controller, new IOException(e));
075    }
076    done.run(resultProto);
077  }
078
079  @Override
080  public Iterable<Service> getServices() {
081    return Collections.singleton(this);
082  }
083
084  /**
085   * Stores a reference to the coprocessor environment provided by the
086   * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
087   * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded on
088   * a table region, so always expects this to be an instance of
089   * {@link RegionCoprocessorEnvironment}.
090   * @param env the environment provided by the coprocessor host
091   * @throws IOException if the provided environment is not an instance of
092   *                     {@code RegionCoprocessorEnvironment}
093   */
094  @Override
095  public void start(CoprocessorEnvironment env) throws IOException {
096    if (env instanceof RegionCoprocessorEnvironment) {
097      this.env = (RegionCoprocessorEnvironment) env;
098    } else {
099      throw new CoprocessorException("Must be loaded on a table region!");
100    }
101  }
102
103  @Override
104  public void stop(CoprocessorEnvironment env) throws IOException {
105    // nothing to do
106  }
107
108  @SuppressWarnings("unchecked")
109  RowProcessor<S, T> constructRowProcessorFromRequest(ProcessRequest request) throws IOException {
110    String className = request.getRowProcessorClassName();
111    Class<?> cls;
112    try {
113      cls = Class.forName(className);
114      RowProcessor<S, T> ci = (RowProcessor<S, T>) cls.getDeclaredConstructor().newInstance();
115      if (request.hasRowProcessorInitializerMessageName()) {
116        Class<?> imn =
117          Class.forName(request.getRowProcessorInitializerMessageName()).asSubclass(Message.class);
118        Method m;
119        try {
120          m = imn.getMethod("parseFrom", ByteString.class);
121        } catch (SecurityException e) {
122          throw new IOException(e);
123        } catch (NoSuchMethodException e) {
124          throw new IOException(e);
125        }
126        S s;
127        try {
128          s = (S) m.invoke(null, request.getRowProcessorInitializerMessage());
129        } catch (IllegalArgumentException e) {
130          throw new IOException(e);
131        } catch (InvocationTargetException e) {
132          throw new IOException(e);
133        }
134        ci.initialize(s);
135      }
136      return ci;
137    } catch (Exception e) {
138      throw new IOException(e);
139    }
140  }
141}