1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.wal;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicLong;
26
27 import org.apache.commons.logging.Log;
28 import org.apache.commons.logging.LogFactory;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.hbase.Cell;
32 import org.apache.hadoop.hbase.CellUtil;
33 import org.apache.hadoop.hbase.HConstants;
34 import org.apache.hadoop.hbase.HRegionInfo;
35 import org.apache.hadoop.hbase.HTableDescriptor;
36 import org.apache.hadoop.hbase.classification.InterfaceAudience;
37
38 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
39 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
40 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
41 import org.apache.hadoop.hbase.util.FSUtils;
42
43
44
45
46
47
48
49 @InterfaceAudience.Private
50 class DisabledWALProvider implements WALProvider {
51
52 private static final Log LOG = LogFactory.getLog(DisabledWALProvider.class);
53
54 WAL disabled;
55
56 @Override
57 public void init(final WALFactory factory, final Configuration conf,
58 final List<WALActionsListener> listeners, String providerId) throws IOException {
59 if (null != disabled) {
60 throw new IllegalStateException("WALProvider.init should only be called once.");
61 }
62 if (null == providerId) {
63 providerId = "defaultDisabled";
64 }
65 disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
66 }
67
68 @Override
69 public WAL getWAL(final byte[] identifier) throws IOException {
70 return disabled;
71 }
72
73 @Override
74 public void close() throws IOException {
75 disabled.close();
76 }
77
78 @Override
79 public void shutdown() throws IOException {
80 disabled.shutdown();
81 }
82
83 private static class DisabledWAL implements WAL {
84 protected final List<WALActionsListener> listeners =
85 new CopyOnWriteArrayList<WALActionsListener>();
86 protected final Path path;
87 protected final WALCoprocessorHost coprocessorHost;
88 protected final AtomicBoolean closed = new AtomicBoolean(false);
89
90 public DisabledWAL(final Path path, final Configuration conf,
91 final List<WALActionsListener> listeners) {
92 this.coprocessorHost = new WALCoprocessorHost(this, conf);
93 this.path = path;
94 if (null != listeners) {
95 for(WALActionsListener listener : listeners) {
96 registerWALActionsListener(listener);
97 }
98 }
99 }
100
101 @Override
102 public void registerWALActionsListener(final WALActionsListener listener) {
103 listeners.add(listener);
104 }
105
106 @Override
107 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
108 return listeners.remove(listener);
109 }
110
111 @Override
112 public byte[][] rollWriter() {
113 if (!listeners.isEmpty()) {
114 for (WALActionsListener listener : listeners) {
115 listener.logRollRequested(false);
116 }
117 for (WALActionsListener listener : listeners) {
118 try {
119 listener.preLogRoll(path, path);
120 } catch (IOException exception) {
121 LOG.debug("Ignoring exception from listener.", exception);
122 }
123 }
124 for (WALActionsListener listener : listeners) {
125 try {
126 listener.postLogRoll(path, path);
127 } catch (IOException exception) {
128 LOG.debug("Ignoring exception from listener.", exception);
129 }
130 }
131 }
132 return null;
133 }
134
135 @Override
136 public byte[][] rollWriter(boolean force) {
137 return rollWriter();
138 }
139
140 @Override
141 public void shutdown() {
142 if(closed.compareAndSet(false, true)) {
143 if (!this.listeners.isEmpty()) {
144 for (WALActionsListener listener : this.listeners) {
145 listener.logCloseRequested();
146 }
147 }
148 }
149 }
150
151 @Override
152 public void close() {
153 shutdown();
154 }
155
156 @Override
157 public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
158 AtomicLong sequenceId, boolean inMemstore, List<Cell> memstoreKVs) {
159 if (!this.listeners.isEmpty()) {
160 final long start = System.nanoTime();
161 long len = 0;
162 for (Cell cell : edits.getCells()) {
163 len += CellUtil.estimatedSerializedSizeOf(cell);
164 }
165 final long elapsed = (System.nanoTime() - start)/1000000l;
166 for (WALActionsListener listener : this.listeners) {
167 listener.postAppend(len, elapsed);
168 }
169 }
170 return -1;
171 }
172
173 @Override
174 public void sync() {
175 if (!this.listeners.isEmpty()) {
176 for (WALActionsListener listener : this.listeners) {
177 listener.postSync(0l, 0);
178 }
179 }
180 }
181
182 @Override
183 public void sync(long txid) {
184 sync();
185 }
186
187 @Override
188 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
189 if (closed.get()) return null;
190 return HConstants.NO_SEQNUM;
191 }
192
193 @Override
194 public void completeCacheFlush(final byte[] encodedRegionName) {
195 }
196
197 @Override
198 public void abortCacheFlush(byte[] encodedRegionName) {
199 }
200
201 @Override
202 public WALCoprocessorHost getCoprocessorHost() {
203 return coprocessorHost;
204 }
205
206 @Override
207 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
208 return HConstants.NO_SEQNUM;
209 }
210
211 @Override
212 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
213 return HConstants.NO_SEQNUM;
214 }
215
216 @Override
217 public String toString() {
218 return "WAL disabled.";
219 }
220 }
221 }