001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.coprocessor; 019 020import java.io.IOException; 021import java.lang.reflect.InvocationTargetException; 022import java.lang.reflect.Method; 023import java.util.Collections; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.yetus.audience.InterfaceStability; 027import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; 028import org.apache.hadoop.hbase.CoprocessorEnvironment; 029import org.apache.hadoop.hbase.HBaseInterfaceAudience; 030import org.apache.hadoop.hbase.HConstants; 031import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest; 032import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse; 033import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService; 034import org.apache.hadoop.hbase.regionserver.Region; 035import org.apache.hadoop.hbase.regionserver.RowProcessor; 036 037import com.google.protobuf.ByteString; 038import com.google.protobuf.Message; 039import com.google.protobuf.RpcCallback; 040import com.google.protobuf.RpcController; 041import com.google.protobuf.Service; 042 043/** 044 * This class demonstrates how to implement atomic read-modify-writes 045 * using {@link Region#processRowsWithLocks} and Coprocessor endpoints. 046 */ 047@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) 048@InterfaceStability.Evolving 049public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message> 050extends RowProcessorService implements RegionCoprocessor { 051 private RegionCoprocessorEnvironment env; 052 /** 053 * Pass a processor to region to process multiple rows atomically. 054 * 055 * The RowProcessor implementations should be the inner classes of your 056 * RowProcessorEndpoint. This way the RowProcessor can be class-loaded with 057 * the Coprocessor endpoint together. 058 * 059 * See {@code TestRowProcessorEndpoint} for example. 060 * 061 * The request contains information for constructing processor 062 * (see {@link #constructRowProcessorFromRequest}. The processor object defines 063 * the read-modify-write procedure. 064 */ 065 @Override 066 public void process(RpcController controller, ProcessRequest request, 067 RpcCallback<ProcessResponse> done) { 068 ProcessResponse resultProto = null; 069 try { 070 RowProcessor<S,T> processor = constructRowProcessorFromRequest(request); 071 Region region = env.getRegion(); 072 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; 073 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE; 074 region.processRowsWithLocks(processor, nonceGroup, nonce); 075 T result = processor.getResult(); 076 ProcessResponse.Builder b = ProcessResponse.newBuilder(); 077 b.setRowProcessorResult(result.toByteString()); 078 resultProto = b.build(); 079 } catch (Exception e) { 080 CoprocessorRpcUtils.setControllerException(controller, new IOException(e)); 081 } 082 done.run(resultProto); 083 } 084 085 @Override 086 public Iterable<Service> getServices() { 087 return Collections.singleton(this); 088 } 089 090 /** 091 * Stores a reference to the coprocessor environment provided by the 092 * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost} from the region where this 093 * coprocessor is loaded. Since this is a coprocessor endpoint, it always expects to be loaded 094 * on a table region, so always expects this to be an instance of 095 * {@link RegionCoprocessorEnvironment}. 096 * @param env the environment provided by the coprocessor host 097 * @throws IOException if the provided environment is not an instance of 098 * {@code RegionCoprocessorEnvironment} 099 */ 100 @Override 101 public void start(CoprocessorEnvironment env) throws IOException { 102 if (env instanceof RegionCoprocessorEnvironment) { 103 this.env = (RegionCoprocessorEnvironment)env; 104 } else { 105 throw new CoprocessorException("Must be loaded on a table region!"); 106 } 107 } 108 109 @Override 110 public void stop(CoprocessorEnvironment env) throws IOException { 111 // nothing to do 112 } 113 114 @SuppressWarnings("unchecked") 115 RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request) 116 throws IOException { 117 String className = request.getRowProcessorClassName(); 118 Class<?> cls; 119 try { 120 cls = Class.forName(className); 121 RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.getDeclaredConstructor().newInstance(); 122 if (request.hasRowProcessorInitializerMessageName()) { 123 Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName()) 124 .asSubclass(Message.class); 125 Method m; 126 try { 127 m = imn.getMethod("parseFrom", ByteString.class); 128 } catch (SecurityException e) { 129 throw new IOException(e); 130 } catch (NoSuchMethodException e) { 131 throw new IOException(e); 132 } 133 S s; 134 try { 135 s = (S)m.invoke(null,request.getRowProcessorInitializerMessage()); 136 } catch (IllegalArgumentException e) { 137 throw new IOException(e); 138 } catch (InvocationTargetException e) { 139 throw new IOException(e); 140 } 141 ci.initialize(s); 142 } 143 return ci; 144 } catch (Exception e) { 145 throw new IOException(e); 146 } 147 } 148}