1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.wal;
19
20 import java.io.IOException;
21 import java.util.List;
22 import java.util.Set;
23 import java.util.concurrent.CopyOnWriteArrayList;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import org.apache.commons.logging.Log;
26 import org.apache.commons.logging.LogFactory;
27 import org.apache.hadoop.conf.Configuration;
28 import org.apache.hadoop.fs.Path;
29 import org.apache.hadoop.hbase.Cell;
30 import org.apache.hadoop.hbase.CellUtil;
31 import org.apache.hadoop.hbase.HConstants;
32 import org.apache.hadoop.hbase.HRegionInfo;
33 import org.apache.hadoop.hbase.HTableDescriptor;
34 import org.apache.hadoop.hbase.classification.InterfaceAudience;
35
36 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
37 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
38 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
39 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
40 import org.apache.hadoop.hbase.util.FSUtils;
41
42
43
44
45
46
47
48 @InterfaceAudience.Private
49 class DisabledWALProvider implements WALProvider {
50
51 private static final Log LOG = LogFactory.getLog(DisabledWALProvider.class);
52
53 WAL disabled;
54
55 @Override
56 public void init(final WALFactory factory, final Configuration conf,
57 final List<WALActionsListener> listeners, String providerId) throws IOException {
58 if (null != disabled) {
59 throw new IllegalStateException("WALProvider.init should only be called once.");
60 }
61 if (null == providerId) {
62 providerId = "defaultDisabled";
63 }
64 disabled = new DisabledWAL(new Path(FSUtils.getRootDir(conf), providerId), conf, null);
65 }
66
67 @Override
68 public WAL getWAL(final byte[] identifier) throws IOException {
69 return disabled;
70 }
71
72 @Override
73 public void close() throws IOException {
74 disabled.close();
75 }
76
77 @Override
78 public void shutdown() throws IOException {
79 disabled.shutdown();
80 }
81
82 private static class DisabledWAL implements WAL {
83 protected final List<WALActionsListener> listeners =
84 new CopyOnWriteArrayList<WALActionsListener>();
85 protected final Path path;
86 protected final WALCoprocessorHost coprocessorHost;
87 protected final AtomicBoolean closed = new AtomicBoolean(false);
88
89 public DisabledWAL(final Path path, final Configuration conf,
90 final List<WALActionsListener> listeners) {
91 this.coprocessorHost = new WALCoprocessorHost(this, conf);
92 this.path = path;
93 if (null != listeners) {
94 for(WALActionsListener listener : listeners) {
95 registerWALActionsListener(listener);
96 }
97 }
98 }
99
100 @Override
101 public void registerWALActionsListener(final WALActionsListener listener) {
102 listeners.add(listener);
103 }
104
105 @Override
106 public boolean unregisterWALActionsListener(final WALActionsListener listener) {
107 return listeners.remove(listener);
108 }
109
110 @Override
111 public byte[][] rollWriter() {
112 if (!listeners.isEmpty()) {
113 for (WALActionsListener listener : listeners) {
114 listener.logRollRequested(false);
115 }
116 for (WALActionsListener listener : listeners) {
117 try {
118 listener.preLogRoll(path, path);
119 } catch (IOException exception) {
120 LOG.debug("Ignoring exception from listener.", exception);
121 }
122 }
123 for (WALActionsListener listener : listeners) {
124 try {
125 listener.postLogRoll(path, path);
126 } catch (IOException exception) {
127 LOG.debug("Ignoring exception from listener.", exception);
128 }
129 }
130 }
131 return null;
132 }
133
134 @Override
135 public byte[][] rollWriter(boolean force) {
136 return rollWriter();
137 }
138
139 @Override
140 public void shutdown() {
141 if(closed.compareAndSet(false, true)) {
142 if (!this.listeners.isEmpty()) {
143 for (WALActionsListener listener : this.listeners) {
144 listener.logCloseRequested();
145 }
146 }
147 }
148 }
149
150 @Override
151 public void close() {
152 shutdown();
153 }
154
155 @Override
156 public long append(HTableDescriptor htd, HRegionInfo info, WALKey key, WALEdit edits,
157 boolean inMemstore) throws IOException {
158 WriteEntry writeEntry = key.getMvcc().begin();
159 if (!edits.isReplay()) {
160 for (Cell cell : edits.getCells()) {
161 CellUtil.setSequenceId(cell, writeEntry.getWriteNumber());
162 }
163 }
164 key.setWriteEntry(writeEntry);
165 if (!this.listeners.isEmpty()) {
166 final long start = System.nanoTime();
167 long len = 0;
168 for (Cell cell : edits.getCells()) {
169 len += CellUtil.estimatedSerializedSizeOf(cell);
170 }
171 final long elapsed = (System.nanoTime() - start)/1000000l;
172 for (WALActionsListener listener : this.listeners) {
173 listener.postAppend(len, elapsed);
174 }
175 }
176 return -1;
177 }
178
179 @Override
180 public void sync() {
181 if (!this.listeners.isEmpty()) {
182 for (WALActionsListener listener : this.listeners) {
183 listener.postSync(0l, 0);
184 }
185 }
186 }
187
188 @Override
189 public void sync(long txid) {
190 sync();
191 }
192
193 @Override
194 public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedFamilyNames) {
195 if (closed.get()) return null;
196 return HConstants.NO_SEQNUM;
197 }
198
199 @Override
200 public void completeCacheFlush(final byte[] encodedRegionName) {
201 }
202
203 @Override
204 public void abortCacheFlush(byte[] encodedRegionName) {
205 }
206
207 @Override
208 public WALCoprocessorHost getCoprocessorHost() {
209 return coprocessorHost;
210 }
211
212 @Override
213 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) {
214 return HConstants.NO_SEQNUM;
215 }
216
217 @Override
218 public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
219 return HConstants.NO_SEQNUM;
220 }
221
222 @Override
223 public String toString() {
224 return "WAL disabled.";
225 }
226 }
227 }