View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.coprocessor;
19  
20  import java.io.IOException;
21  import java.lang.reflect.InvocationTargetException;
22  import java.lang.reflect.Method;
23  
24  import org.apache.hadoop.hbase.classification.InterfaceAudience;
25  import org.apache.hadoop.hbase.classification.InterfaceStability;
26  import org.apache.hadoop.hbase.Coprocessor;
27  import org.apache.hadoop.hbase.CoprocessorEnvironment;
28  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29  import org.apache.hadoop.hbase.HConstants;
30  import org.apache.hadoop.hbase.protobuf.ResponseConverter;
31  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
32  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
33  import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
34  import org.apache.hadoop.hbase.regionserver.HRegion;
35  import org.apache.hadoop.hbase.regionserver.RowProcessor;
36  
37  import com.google.protobuf.ByteString;
38  import com.google.protobuf.Message;
39  import com.google.protobuf.RpcCallback;
40  import com.google.protobuf.RpcController;
41  import com.google.protobuf.Service;
42  
43  /**
44   * This class demonstrates how to implement atomic read-modify-writes
45   * using {@link HRegion#processRowsWithLocks} and Coprocessor endpoints.
46   */
47  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
48  @InterfaceStability.Evolving
49  public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message> 
50  extends RowProcessorService implements CoprocessorService, Coprocessor {
51    private RegionCoprocessorEnvironment env;
52    /**
53     * Pass a processor to HRegion to process multiple rows atomically.
54     * 
55     * The RowProcessor implementations should be the inner classes of your
56     * RowProcessorEndpoint. This way the RowProcessor can be class-loaded with
57     * the Coprocessor endpoint together.
58     *
59     * See {@code TestRowProcessorEndpoint} for example.
60     *
61     * The request contains information for constructing processor 
62     * (see {@link #constructRowProcessorFromRequest}. The processor object defines
63     * the read-modify-write procedure.
64     */
65    @Override
66    public void process(RpcController controller, ProcessRequest request,
67        RpcCallback<ProcessResponse> done) {
68      ProcessResponse resultProto = null;
69      try {
70        RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
71        HRegion region = env.getRegion();
72        long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
73        long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
74        region.processRowsWithLocks(processor, nonceGroup, nonce);
75        T result = processor.getResult();
76        ProcessResponse.Builder b = ProcessResponse.newBuilder();
77        b.setRowProcessorResult(result.toByteString());
78        resultProto = b.build();
79      } catch (Exception e) {
80        ResponseConverter.setControllerException(controller, new IOException(e));
81      }
82      done.run(resultProto);
83    }
84  
85    @Override
86    public Service getService() {
87      return this;
88    }
89  
90    /**
91     * Stores a reference to the coprocessor environment provided by the
92     * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this
93     * coprocessor is loaded.  Since this is a coprocessor endpoint, it always expects to be loaded
94     * on a table region, so always expects this to be an instance of
95     * {@link RegionCoprocessorEnvironment}.
96     * @param env the environment provided by the coprocessor host
97     * @throws IOException if the provided environment is not an instance of
98     * {@code RegionCoprocessorEnvironment}
99     */
100   @Override
101   public void start(CoprocessorEnvironment env) throws IOException {
102     if (env instanceof RegionCoprocessorEnvironment) {
103       this.env = (RegionCoprocessorEnvironment)env;
104     } else {
105       throw new CoprocessorException("Must be loaded on a table region!");
106     }
107   }
108 
109   @Override
110   public void stop(CoprocessorEnvironment env) throws IOException {
111     // nothing to do
112   }
113 
114   @SuppressWarnings("unchecked")
115   RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request)
116       throws IOException {
117     String className = request.getRowProcessorClassName();
118     Class<?> cls;
119     try {
120       cls = Class.forName(className);
121       RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
122       if (request.hasRowProcessorInitializerMessageName()) {
123         Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
124             .asSubclass(Message.class);
125         Method m;
126         try {
127           m = imn.getMethod("parseFrom", ByteString.class);
128         } catch (SecurityException e) {
129           throw new IOException(e);
130         } catch (NoSuchMethodException e) {
131           throw new IOException(e);
132         }
133         S s;
134         try {
135           s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
136         } catch (IllegalArgumentException e) {
137           throw new IOException(e);
138         } catch (InvocationTargetException e) {
139           throw new IOException(e);
140         }
141         ci.initialize(s);
142       }
143       return ci;
144     } catch (ClassNotFoundException e) {
145       throw new IOException(e);
146     } catch (InstantiationException e) {
147       throw new IOException(e);
148     } catch (IllegalAccessException e) {
149       throw new IOException(e);
150     }
151   }
152 }