View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.NavigableMap;
26  import java.util.TreeMap;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.hbase.Cell;
32  import org.apache.hadoop.hbase.CellUtil;
33  import org.apache.hadoop.hbase.HRegionInfo;
34  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
35  import org.apache.hadoop.hbase.KeyValue;
36  import org.apache.hadoop.hbase.KeyValueUtil;
37  import org.apache.hadoop.hbase.codec.Codec;
38  import org.apache.hadoop.hbase.io.HeapSize;
39  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
40  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
41  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FlushDescriptor;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.RegionEventDescriptor;
43  import org.apache.hadoop.hbase.util.Bytes;
44  import org.apache.hadoop.hbase.util.ClassSize;
45  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
46  import org.apache.hadoop.io.Writable;
47  
48  import com.google.common.annotations.VisibleForTesting;
49  
50  
51  /**
52   * WALEdit: Used in HBase's transaction log (WAL) to represent
53   * the collection of edits (KeyValue objects) corresponding to a
54   * single transaction. The class implements "Writable" interface
55   * for serializing/deserializing a set of KeyValue items.
56   *
57   * Previously, if a transaction contains 3 edits to c1, c2, c3 for a row R,
58   * the WAL would have three log entries as follows:
59   *
60   *    <logseq1-for-edit1>:<eyValue-for-edit-c1>
61   *    <logseq2-for-edit2>:<KeyValue-for-edit-c2>
62   *    <logseq3-for-edit3>:<KeyValue-for-edit-c3>
63   *
64   * This presents problems because row level atomicity of transactions
65   * was not guaranteed. If we crash after few of the above appends make
66   * it, then recovery will restore a partial transaction.
67   *
68   * In the new world, all the edits for a given transaction are written
69   * out as a single record, for example:
70   *
71   *   <logseq#-for-entire-txn>:<WALEdit-for-entire-txn>
72   *
73   * where, the WALEdit is serialized as:
74   *   <-1, # of edits, <KeyValue>, <KeyValue>, ... >
75   * For example:
76   *   <-1, 3, <KV-for-edit-c1>, <KV-for-edit-c2>, <KV-for-edit-c3>>
77   *
78   * The -1 marker is just a special way of being backward compatible with
79   * an old WAL which would have contained a single <KeyValue>.
80   *
81   * The deserializer for WALEdit backward compatibly detects if the record
82   * is an old style KeyValue or the new style WALEdit.
83   *
84   */
85  @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.REPLICATION,
86      HBaseInterfaceAudience.COPROC })
87  public class WALEdit implements Writable, HeapSize {
88    private static final Log LOG = LogFactory.getLog(WALEdit.class);
89  
90    // TODO: Get rid of this; see HBASE-8457
91    public static final byte [] METAFAMILY = Bytes.toBytes("METAFAMILY");
92    static final byte [] METAROW = Bytes.toBytes("METAROW");
93    static final byte[] COMPACTION = Bytes.toBytes("HBASE::COMPACTION");
94    static final byte [] FLUSH = Bytes.toBytes("HBASE::FLUSH");
95    static final byte [] REGION_EVENT = Bytes.toBytes("HBASE::REGION_EVENT");
96    @VisibleForTesting
97    public static final byte [] BULK_LOAD = Bytes.toBytes("HBASE::BULK_LOAD");
98  
99    private final int VERSION_2 = -1;
100   private final boolean isReplay;
101 
102   private ArrayList<Cell> cells = null;
103 
104   public static final WALEdit EMPTY_WALEDIT = new WALEdit();
105 
106   // Only here for legacy writable deserialization
107   /**
108    * @deprecated Legacy
109    */
110   @Deprecated
111   private NavigableMap<byte[], Integer> scopes;
112 
113   private CompressionContext compressionContext;
114 
115   public WALEdit() {
116     this(false);
117   }
118 
119   public WALEdit(boolean isReplay) {
120     this.isReplay = isReplay;
121     cells = new ArrayList<Cell>(1);
122   }
123 
124   public WALEdit(int cellCount) {
125     this.isReplay = false;
126     cells = new ArrayList<Cell>(cellCount);
127   }
128 
129   /**
130    * @param f
131    * @return True is <code>f</code> is {@link #METAFAMILY}
132    */
133   public static boolean isMetaEditFamily(final byte [] f) {
134     return Bytes.equals(METAFAMILY, f);
135   }
136 
137   public static boolean isMetaEditFamily(Cell cell) {
138     return CellUtil.matchingFamily(cell, METAFAMILY);
139   }
140 
141   public boolean isMetaEdit() {
142     for (Cell cell: cells) {
143       if (!isMetaEditFamily(cell)) {
144         return false;
145       }
146     }
147     return true;
148   }
149 
150   /**
151    * @return True when current WALEdit is created by log replay. Replication skips WALEdits from
152    *         replay.
153    */
154   public boolean isReplay() {
155     return this.isReplay;
156   }
157 
158   public void setCompressionContext(final CompressionContext compressionContext) {
159     this.compressionContext = compressionContext;
160   }
161 
162   public WALEdit add(Cell cell) {
163     this.cells.add(cell);
164     return this;
165   }
166 
167   public boolean isEmpty() {
168     return cells.isEmpty();
169   }
170 
171   public int size() {
172     return cells.size();
173   }
174 
175   public ArrayList<Cell> getCells() {
176     return cells;
177   }
178 
179   /**
180    * This is not thread safe.
181    * This will change the WALEdit and shouldn't be used unless you are sure that nothing
182    * else depends on the contents being immutable.
183    *
184    * @param cells the list of cells that this WALEdit now contains.
185    */
186   @InterfaceAudience.Private
187   public void setCells(ArrayList<Cell> cells) {
188     this.cells = cells;
189   }
190 
191   public NavigableMap<byte[], Integer> getAndRemoveScopes() {
192     NavigableMap<byte[], Integer> result = scopes;
193     scopes = null;
194     return result;
195   }
196 
197   @Override
198   public void readFields(DataInput in) throws IOException {
199     cells.clear();
200     if (scopes != null) {
201       scopes.clear();
202     }
203     int versionOrLength = in.readInt();
204     // TODO: Change version when we protobuf.  Also, change way we serialize KV!  Pb it too.
205     if (versionOrLength == VERSION_2) {
206       // this is new style WAL entry containing multiple KeyValues.
207       int numEdits = in.readInt();
208       for (int idx = 0; idx < numEdits; idx++) {
209         if (compressionContext != null) {
210           this.add(KeyValueCompression.readKV(in, compressionContext));
211         } else {
212           this.add(KeyValueUtil.create(in));
213         }
214       }
215       int numFamilies = in.readInt();
216       if (numFamilies > 0) {
217         if (scopes == null) {
218           scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
219         }
220         for (int i = 0; i < numFamilies; i++) {
221           byte[] fam = Bytes.readByteArray(in);
222           int scope = in.readInt();
223           scopes.put(fam, scope);
224         }
225       }
226     } else {
227       // this is an old style WAL entry. The int that we just
228       // read is actually the length of a single KeyValue
229       this.add(KeyValueUtil.create(versionOrLength, in));
230     }
231   }
232 
233   @Override
234   public void write(DataOutput out) throws IOException {
235     LOG.warn("WALEdit is being serialized to writable - only expected in test code");
236     out.writeInt(VERSION_2);
237     out.writeInt(cells.size());
238     // We interleave the two lists for code simplicity
239     for (Cell cell : cells) {
240       // This is not used in any of the core code flows so it is just fine to convert to KV
241       KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
242       if (compressionContext != null) {
243         KeyValueCompression.writeKV(out, kv, compressionContext);
244       } else{
245         KeyValueUtil.write(kv, out);
246       }
247     }
248     if (scopes == null) {
249       out.writeInt(0);
250     } else {
251       out.writeInt(scopes.size());
252       for (byte[] key : scopes.keySet()) {
253         Bytes.writeByteArray(out, key);
254         out.writeInt(scopes.get(key));
255       }
256     }
257   }
258 
259   /**
260    * Reads WALEdit from cells.
261    * @param cellDecoder Cell decoder.
262    * @param expectedCount Expected cell count.
263    * @return Number of KVs read.
264    */
265   public int readFromCells(Codec.Decoder cellDecoder, int expectedCount) throws IOException {
266     cells.clear();
267     cells.ensureCapacity(expectedCount);
268     while (cells.size() < expectedCount && cellDecoder.advance()) {
269       cells.add(cellDecoder.current());
270     }
271     return cells.size();
272   }
273 
274   @Override
275   public long heapSize() {
276     long ret = ClassSize.ARRAYLIST;
277     for (Cell cell : cells) {
278       ret += CellUtil.estimatedHeapSizeOf(cell);
279     }
280     if (scopes != null) {
281       ret += ClassSize.TREEMAP;
282       ret += ClassSize.align(scopes.size() * ClassSize.MAP_ENTRY);
283       // TODO this isn't quite right, need help here
284     }
285     return ret;
286   }
287 
288   @Override
289   public String toString() {
290     StringBuilder sb = new StringBuilder();
291 
292     sb.append("[#edits: " + cells.size() + " = <");
293     for (Cell cell : cells) {
294       sb.append(cell);
295       sb.append("; ");
296     }
297     if (scopes != null) {
298       sb.append(" scopes: " + scopes.toString());
299     }
300     sb.append(">]");
301     return sb.toString();
302   }
303 
304   public static WALEdit createFlushWALEdit(HRegionInfo hri, FlushDescriptor f) {
305     KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, FLUSH,
306       EnvironmentEdgeManager.currentTime(), f.toByteArray());
307     return new WALEdit().add(kv);
308   }
309 
310   public static FlushDescriptor getFlushDescriptor(Cell cell) throws IOException {
311     if (CellUtil.matchingColumn(cell, METAFAMILY, FLUSH)) {
312       return FlushDescriptor.parseFrom(CellUtil.cloneValue(cell));
313     }
314     return null;
315   }
316 
317   public static WALEdit createRegionEventWALEdit(HRegionInfo hri,
318       RegionEventDescriptor regionEventDesc) {
319     KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, REGION_EVENT,
320       EnvironmentEdgeManager.currentTime(), regionEventDesc.toByteArray());
321     return new WALEdit().add(kv);
322   }
323 
324   public static RegionEventDescriptor getRegionEventDescriptor(Cell cell) throws IOException {
325     if (CellUtil.matchingColumn(cell, METAFAMILY, REGION_EVENT)) {
326       return RegionEventDescriptor.parseFrom(CellUtil.cloneValue(cell));
327     }
328     return null;
329   }
330 
331   /**
332    * Create a compaction WALEdit
333    * @param c
334    * @return A WALEdit that has <code>c</code> serialized as its value
335    */
336   public static WALEdit createCompaction(final HRegionInfo hri, final CompactionDescriptor c) {
337     byte [] pbbytes = c.toByteArray();
338     KeyValue kv = new KeyValue(getRowForRegion(hri), METAFAMILY, COMPACTION,
339       EnvironmentEdgeManager.currentTime(), pbbytes);
340     return new WALEdit().add(kv); //replication scope null so that this won't be replicated
341   }
342 
343   private static byte[] getRowForRegion(HRegionInfo hri) {
344     byte[] startKey = hri.getStartKey();
345     if (startKey.length == 0) {
346       // empty row key is not allowed in mutations because it is both the start key and the end key
347       // we return the smallest byte[] that is bigger (in lex comparison) than byte[0].
348       return new byte[] {0};
349     }
350     return startKey;
351   }
352 
353   /**
354    * Deserialized and returns a CompactionDescriptor is the KeyValue contains one.
355    * @param kv the key value
356    * @return deserialized CompactionDescriptor or null.
357    */
358   public static CompactionDescriptor getCompaction(Cell kv) throws IOException {
359     if (CellUtil.matchingColumn(kv, METAFAMILY, COMPACTION)) {
360       return CompactionDescriptor.parseFrom(CellUtil.cloneValue(kv));
361     }
362     return null;
363   }
364 
365   /**
366    * Create a bulk loader WALEdit
367    *
368    * @param hri                The HRegionInfo for the region in which we are bulk loading
369    * @param bulkLoadDescriptor The descriptor for the Bulk Loader
370    * @return The WALEdit for the BulkLoad
371    */
372   public static WALEdit createBulkLoadEvent(HRegionInfo hri,
373                                             WALProtos.BulkLoadDescriptor bulkLoadDescriptor) {
374     KeyValue kv = new KeyValue(getRowForRegion(hri),
375         METAFAMILY,
376         BULK_LOAD,
377         EnvironmentEdgeManager.currentTime(),
378         bulkLoadDescriptor.toByteArray());
379     return new WALEdit().add(kv);
380   }
381 
382   /**
383    * Deserialized and returns a BulkLoadDescriptor from the passed in Cell
384    * @param cell the key value
385    * @return deserialized BulkLoadDescriptor or null.
386    */
387   public static WALProtos.BulkLoadDescriptor getBulkLoadDescriptor(Cell cell) throws IOException {
388     if (CellUtil.matchingColumn(cell, METAFAMILY, BULK_LOAD)) {
389       return WALProtos.BulkLoadDescriptor.parseFrom(CellUtil.cloneValue(cell));
390     }
391     return null;
392   }
393 }