1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.procedure2.store.wal;
20
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.Map;
24 import java.util.HashMap;
25
26 import org.apache.commons.logging.Log;
27 import org.apache.commons.logging.LogFactory;
28 import org.apache.hadoop.fs.FSDataInputStream;
29 import org.apache.hadoop.hbase.classification.InterfaceAudience;
30 import org.apache.hadoop.hbase.classification.InterfaceStability;
31 import org.apache.hadoop.hbase.ProcedureInfo;
32 import org.apache.hadoop.hbase.procedure2.Procedure;
33 import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
34 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
35 import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36
37
38
39
40 @InterfaceAudience.Private
41 @InterfaceStability.Evolving
42 public class ProcedureWALFormatReader {
43 private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
44
45 private final ProcedureStoreTracker tracker;
46
47
48 private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
49 private final Map<Long, ProcedureProtos.Procedure> localProcedures =
50 new HashMap<Long, ProcedureProtos.Procedure>();
51
52 private long maxProcId = 0;
53
54 public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
55 this.tracker = tracker;
56 }
57
58 public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
59 FSDataInputStream stream = log.getStream();
60 try {
61 boolean hasMore = true;
62 while (hasMore) {
63 ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
64 if (entry == null) {
65 LOG.warn("nothing left to decode. exiting with missing EOF");
66 hasMore = false;
67 break;
68 }
69 switch (entry.getType()) {
70 case INIT:
71 readInitEntry(entry);
72 break;
73 case INSERT:
74 readInsertEntry(entry);
75 break;
76 case UPDATE:
77 case COMPACT:
78 readUpdateEntry(entry);
79 break;
80 case DELETE:
81 readDeleteEntry(entry);
82 break;
83 case EOF:
84 hasMore = false;
85 break;
86 default:
87 throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
88 }
89 }
90 } catch (IOException e) {
91 LOG.error("got an exception while reading the procedure WAL: " + log, e);
92 loader.markCorruptedWAL(log, e);
93 }
94
95 if (!localProcedures.isEmpty()) {
96 Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
97 localProcedures.entrySet().iterator();
98 long minProcId = Long.MAX_VALUE;
99 long maxProcId = Long.MIN_VALUE;
100 while (itd.hasNext()) {
101 Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
102 itd.remove();
103
104 long procId = entry.getKey();
105 minProcId = Math.min(minProcId, procId);
106 maxProcId = Math.max(maxProcId, procId);
107
108
109 Procedure proc = Procedure.convert(entry.getValue());
110 procedures.put(procId, proc);
111 }
112
113
114
115 log.setProcIds(minProcId, maxProcId);
116 }
117 }
118
119 public Iterator<Procedure> getProcedures() {
120 return procedures.values().iterator();
121 }
122
123 private void loadEntries(final ProcedureWALEntry entry) {
124 for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
125 maxProcId = Math.max(maxProcId, proc.getProcId());
126 if (isRequired(proc.getProcId())) {
127 if (LOG.isTraceEnabled()) {
128 LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
129 }
130 localProcedures.put(proc.getProcId(), proc);
131 tracker.setDeleted(proc.getProcId(), false);
132 }
133 }
134 }
135
136 private void readInitEntry(final ProcedureWALEntry entry)
137 throws IOException {
138 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
139
140 loadEntries(entry);
141 }
142
143 private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
144 assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
145 loadEntries(entry);
146 }
147
148 private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
149 assert entry.getProcedureCount() == 1 : "Expected only one procedure";
150 loadEntries(entry);
151 }
152
153 private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
154 assert entry.getProcedureCount() == 0 : "Expected no procedures";
155 assert entry.hasProcId() : "expected ProcID";
156 if (LOG.isTraceEnabled()) {
157 LOG.trace("read delete entry " + entry.getProcId());
158 }
159 maxProcId = Math.max(maxProcId, entry.getProcId());
160 localProcedures.remove(entry.getProcId());
161 assert !procedures.containsKey(entry.getProcId());
162 tracker.setDeleted(entry.getProcId(), true);
163 }
164
165 private boolean isDeleted(final long procId) {
166 return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
167 }
168
169 private boolean isRequired(final long procId) {
170 return !isDeleted(procId) && !procedures.containsKey(procId);
171 }
172 }