1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import org.apache.hadoop.hbase.Coprocessor;
27 import org.apache.hadoop.hbase.HRegionInfo;
28 import org.apache.hadoop.hbase.coprocessor.*;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.hbase.wal.WAL;
32 import org.apache.hadoop.hbase.wal.WALKey;
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class WALCoprocessorHost
40 extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
41
42
43
44
45 static class WALEnvironment extends CoprocessorHost.Environment
46 implements WALCoprocessorEnvironment {
47
48 private final WAL wal;
49
50 final boolean useLegacyPre;
51 final boolean useLegacyPost;
52
53 @Override
54 public WAL getWAL() {
55 return wal;
56 }
57
58
59
60
61
62
63
64
65
66
67 public WALEnvironment(Class<?> implClass, final Coprocessor impl,
68 final int priority, final int seq, final Configuration conf,
69 final WAL wal) {
70 super(impl, priority, seq, conf);
71 this.wal = wal;
72
73
74
75
76 useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class,
77 HRegionInfo.class, WALKey.class, WALEdit.class);
78 useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class,
79 HRegionInfo.class, WALKey.class, WALEdit.class);
80 }
81 }
82
83 private final WAL wal;
84
85
86
87
88
89
90 public WALCoprocessorHost(final WAL log, final Configuration conf) {
91
92
93
94
95
96
97 super(null);
98 this.wal = log;
99
100 loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
101 }
102
103 @Override
104 public WALEnvironment createEnvironment(final Class<?> implClass,
105 final Coprocessor instance, final int priority, final int seq,
106 final Configuration conf) {
107 return new WALEnvironment(implClass, instance, priority, seq, conf,
108 this.wal);
109 }
110
111
112
113
114
115
116
117
118 public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
119 throws IOException {
120 boolean bypass = false;
121 if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
122 ObserverContext<WALCoprocessorEnvironment> ctx = null;
123 List<WALEnvironment> envs = coprocessors.get();
124 for (int i = 0; i < envs.size(); i++) {
125 WALEnvironment env = envs.get(i);
126 if (env.getInstance() instanceof WALObserver) {
127 final WALObserver observer = (WALObserver)env.getInstance();
128 ctx = ObserverContext.createAndPrepare(env, ctx);
129 Thread currentThread = Thread.currentThread();
130 ClassLoader cl = currentThread.getContextClassLoader();
131 try {
132 currentThread.setContextClassLoader(env.getClassLoader());
133 if (env.useLegacyPre) {
134 if (logKey instanceof HLogKey) {
135 observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit);
136 } else {
137 legacyWarning(observer.getClass(),
138 "There are wal keys present that are not HLogKey.");
139 }
140 } else {
141 observer.preWALWrite(ctx, info, logKey, logEdit);
142 }
143 } catch (Throwable e) {
144 handleCoprocessorThrowable(env, e);
145 } finally {
146 currentThread.setContextClassLoader(cl);
147 }
148 bypass |= ctx.shouldBypass();
149 if (ctx.shouldComplete()) {
150 break;
151 }
152 }
153 }
154 return bypass;
155 }
156
157
158
159
160
161
162
163 public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
164 throws IOException {
165 if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
166 ObserverContext<WALCoprocessorEnvironment> ctx = null;
167 List<WALEnvironment> envs = coprocessors.get();
168 for (int i = 0; i < envs.size(); i++) {
169 WALEnvironment env = envs.get(i);
170 if (env.getInstance() instanceof WALObserver) {
171 final WALObserver observer = (WALObserver)env.getInstance();
172 ctx = ObserverContext.createAndPrepare(env, ctx);
173 Thread currentThread = Thread.currentThread();
174 ClassLoader cl = currentThread.getContextClassLoader();
175 try {
176 currentThread.setContextClassLoader(env.getClassLoader());
177 if (env.useLegacyPost) {
178 if (logKey instanceof HLogKey) {
179 observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit);
180 } else {
181 legacyWarning(observer.getClass(),
182 "There are wal keys present that are not HLogKey.");
183 }
184 } else {
185 observer.postWALWrite(ctx, info, logKey, logEdit);
186 }
187 } catch (Throwable e) {
188 handleCoprocessorThrowable(env, e);
189 } finally {
190 currentThread.setContextClassLoader(cl);
191 }
192 if (ctx.shouldComplete()) {
193 break;
194 }
195 }
196 }
197 }
198 }