1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.coprocessor;
19
20 import java.io.IOException;
21 import java.lang.reflect.InvocationTargetException;
22 import java.lang.reflect.Method;
23
24 import org.apache.hadoop.hbase.classification.InterfaceAudience;
25 import org.apache.hadoop.hbase.classification.InterfaceStability;
26 import org.apache.hadoop.hbase.Coprocessor;
27 import org.apache.hadoop.hbase.CoprocessorEnvironment;
28 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
29 import org.apache.hadoop.hbase.HConstants;
30 import org.apache.hadoop.hbase.protobuf.ResponseConverter;
31 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessRequest;
32 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.ProcessResponse;
33 import org.apache.hadoop.hbase.protobuf.generated.RowProcessorProtos.RowProcessorService;
34 import org.apache.hadoop.hbase.regionserver.Region;
35 import org.apache.hadoop.hbase.regionserver.RowProcessor;
36
37 import com.google.protobuf.ByteString;
38 import com.google.protobuf.Message;
39 import com.google.protobuf.RpcCallback;
40 import com.google.protobuf.RpcController;
41 import com.google.protobuf.Service;
42
43
44
45
46
47 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
48 @InterfaceStability.Evolving
49 public abstract class BaseRowProcessorEndpoint<S extends Message, T extends Message>
50 extends RowProcessorService implements CoprocessorService, Coprocessor {
51 private RegionCoprocessorEnvironment env;
52
53
54
55
56
57
58
59
60
61
62
63
64
65 @Override
66 public void process(RpcController controller, ProcessRequest request,
67 RpcCallback<ProcessResponse> done) {
68 ProcessResponse resultProto = null;
69 try {
70 RowProcessor<S,T> processor = constructRowProcessorFromRequest(request);
71 Region region = env.getRegion();
72 long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE;
73 long nonce = request.hasNonce() ? request.getNonce() : HConstants.NO_NONCE;
74 region.processRowsWithLocks(processor, nonceGroup, nonce);
75 T result = processor.getResult();
76 ProcessResponse.Builder b = ProcessResponse.newBuilder();
77 b.setRowProcessorResult(result.toByteString());
78 resultProto = b.build();
79 } catch (Exception e) {
80 ResponseConverter.setControllerException(controller, new IOException(e));
81 }
82 done.run(resultProto);
83 }
84
85 @Override
86 public Service getService() {
87 return this;
88 }
89
90
91
92
93
94
95
96
97
98
99
100 @Override
101 public void start(CoprocessorEnvironment env) throws IOException {
102 if (env instanceof RegionCoprocessorEnvironment) {
103 this.env = (RegionCoprocessorEnvironment)env;
104 } else {
105 throw new CoprocessorException("Must be loaded on a table region!");
106 }
107 }
108
109 @Override
110 public void stop(CoprocessorEnvironment env) throws IOException {
111
112 }
113
114 @SuppressWarnings("unchecked")
115 RowProcessor<S,T> constructRowProcessorFromRequest(ProcessRequest request)
116 throws IOException {
117 String className = request.getRowProcessorClassName();
118 Class<?> cls;
119 try {
120 cls = Class.forName(className);
121 RowProcessor<S,T> ci = (RowProcessor<S,T>) cls.newInstance();
122 if (request.hasRowProcessorInitializerMessageName()) {
123 Class<?> imn = Class.forName(request.getRowProcessorInitializerMessageName())
124 .asSubclass(Message.class);
125 Method m;
126 try {
127 m = imn.getMethod("parseFrom", ByteString.class);
128 } catch (SecurityException e) {
129 throw new IOException(e);
130 } catch (NoSuchMethodException e) {
131 throw new IOException(e);
132 }
133 S s;
134 try {
135 s = (S)m.invoke(null,request.getRowProcessorInitializerMessage());
136 } catch (IllegalArgumentException e) {
137 throw new IOException(e);
138 } catch (InvocationTargetException e) {
139 throw new IOException(e);
140 }
141 ci.initialize(s);
142 }
143 return ci;
144 } catch (ClassNotFoundException e) {
145 throw new IOException(e);
146 } catch (InstantiationException e) {
147 throw new IOException(e);
148 } catch (IllegalAccessException e) {
149 throw new IOException(e);
150 }
151 }
152 }