View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.procedure2.store.wal;
20  
21  import java.io.IOException;
22  import java.util.Iterator;
23  import java.util.Map;
24  import java.util.HashMap;
25  
26  import org.apache.commons.logging.Log;
27  import org.apache.commons.logging.LogFactory;
28  import org.apache.hadoop.fs.FSDataInputStream;
29  import org.apache.hadoop.hbase.classification.InterfaceAudience;
30  import org.apache.hadoop.hbase.classification.InterfaceStability;
31  import org.apache.hadoop.hbase.ProcedureInfo;
32  import org.apache.hadoop.hbase.procedure2.Procedure;
33  import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
34  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
35  import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
36  
37  /**
38   * Helper class that loads the procedures stored in a WAL
39   */
40  @InterfaceAudience.Private
41  @InterfaceStability.Evolving
42  public class ProcedureWALFormatReader {
43    private static final Log LOG = LogFactory.getLog(ProcedureWALFormatReader.class);
44  
45    private final ProcedureStoreTracker tracker;
46    //private final long compactionLogId;
47  
48    private final Map<Long, Procedure> procedures = new HashMap<Long, Procedure>();
49    private final Map<Long, ProcedureProtos.Procedure> localProcedures =
50      new HashMap<Long, ProcedureProtos.Procedure>();
51  
52    private long maxProcId = 0;
53  
54    public ProcedureWALFormatReader(final ProcedureStoreTracker tracker) {
55      this.tracker = tracker;
56    }
57  
58    public void read(ProcedureWALFile log, ProcedureWALFormat.Loader loader) throws IOException {
59      FSDataInputStream stream = log.getStream();
60      try {
61        boolean hasMore = true;
62        while (hasMore) {
63          ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
64          if (entry == null) {
65            LOG.warn("nothing left to decode. exiting with missing EOF");
66            hasMore = false;
67            break;
68          }
69          switch (entry.getType()) {
70            case INIT:
71              readInitEntry(entry);
72              break;
73            case INSERT:
74              readInsertEntry(entry);
75              break;
76            case UPDATE:
77            case COMPACT:
78              readUpdateEntry(entry);
79              break;
80            case DELETE:
81              readDeleteEntry(entry);
82              break;
83            case EOF:
84              hasMore = false;
85              break;
86            default:
87              throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
88          }
89        }
90      } catch (IOException e) {
91        LOG.error("got an exception while reading the procedure WAL: " + log, e);
92        loader.markCorruptedWAL(log, e);
93      }
94  
95      if (!localProcedures.isEmpty()) {
96        Iterator<Map.Entry<Long, ProcedureProtos.Procedure>> itd =
97          localProcedures.entrySet().iterator();
98        long minProcId = Long.MAX_VALUE;
99        long maxProcId = Long.MIN_VALUE;
100       while (itd.hasNext()) {
101         Map.Entry<Long, ProcedureProtos.Procedure> entry = itd.next();
102         itd.remove();
103 
104         long procId = entry.getKey();
105         minProcId = Math.min(minProcId, procId);
106         maxProcId = Math.max(maxProcId, procId);
107 
108         // Deserialize the procedure
109         Procedure proc = Procedure.convert(entry.getValue());
110         procedures.put(procId, proc);
111       }
112 
113       // TODO: Some procedure may be already runnables (see readInitEntry())
114       //       (we can also check the "update map" in the log trackers)
115       log.setProcIds(minProcId, maxProcId);
116     }
117   }
118 
119   public Iterator<Procedure> getProcedures() {
120     return procedures.values().iterator();
121   }
122 
123   private void loadEntries(final ProcedureWALEntry entry) {
124     for (ProcedureProtos.Procedure proc: entry.getProcedureList()) {
125       maxProcId = Math.max(maxProcId, proc.getProcId());
126       if (isRequired(proc.getProcId())) {
127         if (LOG.isTraceEnabled()) {
128           LOG.trace("read " + entry.getType() + " entry " + proc.getProcId());
129         }
130         localProcedures.put(proc.getProcId(), proc);
131         tracker.setDeleted(proc.getProcId(), false);
132       }
133     }
134   }
135 
136   private void readInitEntry(final ProcedureWALEntry entry)
137       throws IOException {
138     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
139     // TODO: Make it runnable, before reading other files
140     loadEntries(entry);
141   }
142 
143   private void readInsertEntry(final ProcedureWALEntry entry) throws IOException {
144     assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
145     loadEntries(entry);
146   }
147 
148   private void readUpdateEntry(final ProcedureWALEntry entry) throws IOException {
149     assert entry.getProcedureCount() == 1 : "Expected only one procedure";
150     loadEntries(entry);
151   }
152 
153   private void readDeleteEntry(final ProcedureWALEntry entry) throws IOException {
154     assert entry.getProcedureCount() == 0 : "Expected no procedures";
155     assert entry.hasProcId() : "expected ProcID";
156     if (LOG.isTraceEnabled()) {
157       LOG.trace("read delete entry " + entry.getProcId());
158     }
159     maxProcId = Math.max(maxProcId, entry.getProcId());
160     localProcedures.remove(entry.getProcId());
161     assert !procedures.containsKey(entry.getProcId());
162     tracker.setDeleted(entry.getProcId(), true);
163   }
164 
165   private boolean isDeleted(final long procId) {
166     return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
167   }
168 
169   private boolean isRequired(final long procId) {
170     return !isDeleted(procId) && !procedures.containsKey(procId);
171   }
172 }