1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.apache.hadoop.hbase.regionserver.wal;
22
23 import java.io.IOException;
24
25 import org.apache.hadoop.hbase.Coprocessor;
26 import org.apache.hadoop.hbase.HRegionInfo;
27 import org.apache.hadoop.hbase.coprocessor.*;
28 import org.apache.hadoop.hbase.classification.InterfaceAudience;
29 import org.apache.hadoop.conf.Configuration;
30
31 import org.apache.hadoop.hbase.wal.WAL;
32 import org.apache.hadoop.hbase.wal.WALKey;
33
34
35
36
37
38 @InterfaceAudience.Private
39 public class WALCoprocessorHost
40 extends CoprocessorHost<WALCoprocessorHost.WALEnvironment> {
41
42
43
44
45 static class WALEnvironment extends CoprocessorHost.Environment
46 implements WALCoprocessorEnvironment {
47
48 private final WAL wal;
49
50 final boolean useLegacyPre;
51 final boolean useLegacyPost;
52
53 @Override
54 public WAL getWAL() {
55 return wal;
56 }
57
58
59
60
61
62
63
64
65
66
67 public WALEnvironment(Class<?> implClass, final Coprocessor impl,
68 final int priority, final int seq, final Configuration conf,
69 final WAL wal) {
70 super(impl, priority, seq, conf);
71 this.wal = wal;
72
73
74
75
76 useLegacyPre = useLegacyMethod(impl.getClass(), "preWALWrite", ObserverContext.class,
77 HRegionInfo.class, WALKey.class, WALEdit.class);
78 useLegacyPost = useLegacyMethod(impl.getClass(), "postWALWrite", ObserverContext.class,
79 HRegionInfo.class, WALKey.class, WALEdit.class);
80 }
81 }
82
83 private final WAL wal;
84
85
86
87
88
89
90 public WALCoprocessorHost(final WAL log, final Configuration conf) {
91
92
93
94
95
96
97 super(null);
98 this.wal = log;
99
100 loadSystemCoprocessors(conf, WAL_COPROCESSOR_CONF_KEY);
101 }
102
103 @Override
104 public WALEnvironment createEnvironment(final Class<?> implClass,
105 final Coprocessor instance, final int priority, final int seq,
106 final Configuration conf) {
107 return new WALEnvironment(implClass, instance, priority, seq, conf,
108 this.wal);
109 }
110
111
112
113
114
115
116
117
118 public boolean preWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
119 throws IOException {
120 boolean bypass = false;
121 if (this.coprocessors == null || this.coprocessors.isEmpty()) return bypass;
122 ObserverContext<WALCoprocessorEnvironment> ctx = null;
123 for (WALEnvironment env: coprocessors) {
124 if (env.getInstance() instanceof WALObserver) {
125 final WALObserver observer = (WALObserver)env.getInstance();
126 ctx = ObserverContext.createAndPrepare(env, ctx);
127 Thread currentThread = Thread.currentThread();
128 ClassLoader cl = currentThread.getContextClassLoader();
129 try {
130 currentThread.setContextClassLoader(env.getClassLoader());
131 if (env.useLegacyPre) {
132 if (logKey instanceof HLogKey) {
133 observer.preWALWrite(ctx, info, (HLogKey)logKey, logEdit);
134 } else {
135 legacyWarning(observer.getClass(),
136 "There are wal keys present that are not HLogKey.");
137 }
138 } else {
139 observer.preWALWrite(ctx, info, logKey, logEdit);
140 }
141 } catch (Throwable e) {
142 handleCoprocessorThrowable(env, e);
143 } finally {
144 currentThread.setContextClassLoader(cl);
145 }
146 bypass |= ctx.shouldBypass();
147 if (ctx.shouldComplete()) {
148 break;
149 }
150 }
151 }
152 return bypass;
153 }
154
155
156
157
158
159
160
161 public void postWALWrite(final HRegionInfo info, final WALKey logKey, final WALEdit logEdit)
162 throws IOException {
163 if (this.coprocessors == null || this.coprocessors.isEmpty()) return;
164 ObserverContext<WALCoprocessorEnvironment> ctx = null;
165 for (WALEnvironment env: coprocessors) {
166 if (env.getInstance() instanceof WALObserver) {
167 final WALObserver observer = (WALObserver)env.getInstance();
168 ctx = ObserverContext.createAndPrepare(env, ctx);
169 Thread currentThread = Thread.currentThread();
170 ClassLoader cl = currentThread.getContextClassLoader();
171 try {
172 currentThread.setContextClassLoader(env.getClassLoader());
173 if (env.useLegacyPost) {
174 if (logKey instanceof HLogKey) {
175 observer.postWALWrite(ctx, info, (HLogKey)logKey, logEdit);
176 } else {
177 legacyWarning(observer.getClass(),
178 "There are wal keys present that are not HLogKey.");
179 }
180 } else {
181 observer.postWALWrite(ctx, info, logKey, logEdit);
182 }
183 } catch (Throwable e) {
184 handleCoprocessorThrowable(env, e);
185 } finally {
186 currentThread.setContextClassLoader(cl);
187 }
188 if (ctx.shouldComplete()) {
189 break;
190 }
191 }
192 }
193 }
194 }