001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
031
032/**
033 * Helper class that loads the procedures stored in a WAL.
034 */
035@InterfaceAudience.Private
036public class ProcedureWALFormatReader {
037  private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
038
039  /**
040   * We will use the localProcedureMap to track the active procedures for the current proc wal file,
041   * and when we finished reading one proc wal file, we will merge he localProcedureMap to the
042   * procedureMap, which tracks the global active procedures.
043   * <p/>
044   * See the comments of {@link WALProcedureMap} for more details.
045   * <p/>
046   * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
047   * {@link WALProcedureTree}, and then give the result to the upper layer. See the comments of
048   * {@link WALProcedureTree} and the code in {@link #finish()} for more details.
049   */
050  private final WALProcedureMap localProcedureMap = new WALProcedureMap();
051  private final WALProcedureMap procedureMap = new WALProcedureMap();
052
053  private final ProcedureWALFormat.Loader loader;
054
055  /**
056   * Global tracker that will be used by the WALProcedureStore after load.
057   * If the last WAL was closed cleanly we already have a full tracker ready to be used.
058   * If the last WAL was truncated (e.g. master killed) the tracker will be empty
059   * and the 'partial' flag will be set. In this case, on WAL replay we are going
060   * to rebuild the tracker.
061   */
062  private final ProcedureStoreTracker tracker;
063
064  /**
065   * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
066   * the list of procedures modified in that WAL because we need it for log cleaning purposes. If
067   * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
068   * {@link WALProcedureStore#removeInactiveLogs()}).
069   * <p/>
070   * Notice that, the deleted part for this tracker will not be global valid as we can only count
071   * the deletes in the current file, but it is not big problem as finally, the above tracker will
072   * have the global state of deleted, and it will also be used to build the cleanup tracker.
073   */
074  private ProcedureStoreTracker localTracker;
075
076  private long maxProcId = 0;
077
078  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
079      ProcedureWALFormat.Loader loader) {
080    this.tracker = tracker;
081    this.loader = loader;
082  }
083
084  public void read(ProcedureWALFile log) throws IOException {
085    localTracker = log.getTracker();
086    if (localTracker.isPartial()) {
087      LOG.info("Rebuilding tracker for {}", log);
088    }
089
090    long count = 0;
091    FSDataInputStream stream = log.getStream();
092    try {
093      boolean hasMore = true;
094      while (hasMore) {
095        ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
096        if (entry == null) {
097          LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
098          break;
099        }
100        count++;
101        switch (entry.getType()) {
102          case PROCEDURE_WAL_INIT:
103            readInitEntry(entry);
104            break;
105          case PROCEDURE_WAL_INSERT:
106            readInsertEntry(entry);
107            break;
108          case PROCEDURE_WAL_UPDATE:
109          case PROCEDURE_WAL_COMPACT:
110            readUpdateEntry(entry);
111            break;
112          case PROCEDURE_WAL_DELETE:
113            readDeleteEntry(entry);
114            break;
115          case PROCEDURE_WAL_EOF:
116            hasMore = false;
117            break;
118          default:
119            throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
120        }
121      }
122      LOG.info("Read {} entries in {}", count, log);
123    } catch (InvalidProtocolBufferException e) {
124      LOG.error("While reading entry #{} in {}", count, log, e);
125      loader.markCorruptedWAL(log, e);
126    }
127
128    if (!localProcedureMap.isEmpty()) {
129      log.setProcIds(localProcedureMap.getMinModifiedProcId(),
130        localProcedureMap.getMaxModifiedProcId());
131      if (localTracker.isPartial()) {
132        localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
133          localProcedureMap.getMaxModifiedProcId());
134      }
135      procedureMap.merge(localProcedureMap);
136    }
137    // Do not reset the partial flag for local tracker, as here the local tracker only know the
138    // procedures which are modified in this file.
139  }
140
141  public void finish() throws IOException {
142    // notify the loader about the max proc ID
143    loader.setMaxProcId(maxProcId);
144
145    // build the procedure execution tree. When building we will verify that whether a procedure is
146    // valid.
147    WALProcedureTree tree = WALProcedureTree.build(procedureMap.getProcedures());
148    loader.load(tree.getValidProcs());
149    loader.handleCorrupted(tree.getCorruptedProcs());
150  }
151
152  private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
153    if (tracker.isPartial()) {
154      tracker.setDeleted(procId, true);
155    }
156  }
157
158  private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
159    if (tracker.isPartial()) {
160      tracker.insert(proc.getProcId());
161    }
162  }
163
164  private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
165    maxProcId = Math.max(maxProcId, proc.getProcId());
166    if (isRequired(proc.getProcId())) {
167      LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
168      localProcedureMap.add(proc);
169      insertIfPartial(tracker, proc);
170    }
171    insertIfPartial(localTracker, proc);
172  }
173
174  private void readInitEntry(ProcedureWALEntry entry) {
175    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
176    loadProcedure(entry, entry.getProcedure(0));
177  }
178
179  private void readInsertEntry(ProcedureWALEntry entry) {
180    assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
181    loadProcedure(entry, entry.getProcedure(0));
182    for (int i = 1; i < entry.getProcedureCount(); ++i) {
183      loadProcedure(entry, entry.getProcedure(i));
184    }
185  }
186
187  private void readUpdateEntry(ProcedureWALEntry entry) {
188    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
189    loadProcedure(entry, entry.getProcedure(0));
190  }
191
192  private void readDeleteEntry(ProcedureWALEntry entry) {
193    assert entry.hasProcId() : "expected ProcID";
194
195    if (entry.getChildIdCount() > 0) {
196      assert entry.getProcedureCount() == 1 : "Expected only one procedure";
197
198      // update the parent procedure
199      loadProcedure(entry, entry.getProcedure(0));
200
201      // remove the child procedures of entry.getProcId()
202      for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) {
203        deleteEntry(entry.getChildId(i));
204      }
205    } else {
206      assert entry.getProcedureCount() == 0 : "Expected no procedures";
207
208      // delete the procedure
209      deleteEntry(entry.getProcId());
210    }
211  }
212
213  private void deleteEntry(final long procId) {
214    LOG.trace("delete entry {}", procId);
215    maxProcId = Math.max(maxProcId, procId);
216    localProcedureMap.remove(procId);
217    assert !procedureMap.contains(procId);
218    setDeletedIfPartial(tracker, procId);
219    setDeletedIfPartial(localTracker, procId);
220  }
221
222  private boolean isDeleted(long procId) {
223    return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
224  }
225
226  private boolean isRequired(long procId) {
227    return !isDeleted(procId) && !procedureMap.contains(procId);
228  }
229}