001/**
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.procedure2.store.wal;
019
020import java.io.IOException;
021import org.apache.hadoop.fs.FSDataInputStream;
022import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
023import org.apache.yetus.audience.InterfaceAudience;
024import org.slf4j.Logger;
025import org.slf4j.LoggerFactory;
026
027import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
028
029import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
030import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureWALEntry;
031
032/**
033 * Helper class that loads the procedures stored in a WAL.
034 * @deprecated Since 2.3.0, will be removed in 4.0.0. Keep here only for rolling upgrading, now we
035 *             use the new region based procedure store.
036 */
037@Deprecated
038@InterfaceAudience.Private
039class ProcedureWALFormatReader {
040  private static final Logger LOG = LoggerFactory.getLogger(ProcedureWALFormatReader.class);
041
042  /**
043   * We will use the localProcedureMap to track the active procedures for the current proc wal file,
044   * and when we finished reading one proc wal file, we will merge he localProcedureMap to the
045   * procedureMap, which tracks the global active procedures.
046   * <p/>
047   * See the comments of {@link WALProcedureMap} for more details.
048   * <p/>
049   * After reading all the proc wal files, we will use the procedures in the procedureMap to build a
050   * {@link ProcedureTree}, and then give the result to the upper layer. See the comments of
051   * {@link ProcedureTree} and the code in {@link #finish()} for more details.
052   */
053  private final WALProcedureMap localProcedureMap = new WALProcedureMap();
054  private final WALProcedureMap procedureMap = new WALProcedureMap();
055
056  private final ProcedureWALFormat.Loader loader;
057
058  /**
059   * Global tracker that will be used by the WALProcedureStore after load.
060   * If the last WAL was closed cleanly we already have a full tracker ready to be used.
061   * If the last WAL was truncated (e.g. master killed) the tracker will be empty
062   * and the 'partial' flag will be set. In this case, on WAL replay we are going
063   * to rebuild the tracker.
064   */
065  private final ProcedureStoreTracker tracker;
066
067  /**
068   * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we re-build
069   * the list of procedures modified in that WAL because we need it for log cleaning purposes. If
070   * all procedures modified in a WAL are found to be obsolete, it can be safely deleted. (see
071   * {@link WALProcedureStore#removeInactiveLogs()}).
072   * <p/>
073   * Notice that, the deleted part for this tracker will not be global valid as we can only count
074   * the deletes in the current file, but it is not big problem as finally, the above tracker will
075   * have the global state of deleted, and it will also be used to build the cleanup tracker.
076   */
077  private ProcedureStoreTracker localTracker;
078
079  private long maxProcId = 0;
080
081  public ProcedureWALFormatReader(final ProcedureStoreTracker tracker,
082      ProcedureWALFormat.Loader loader) {
083    this.tracker = tracker;
084    this.loader = loader;
085  }
086
087  public void read(ProcedureWALFile log) throws IOException {
088    localTracker = log.getTracker();
089    if (localTracker.isPartial()) {
090      LOG.info("Rebuilding tracker for {}", log);
091    }
092
093    long count = 0;
094    FSDataInputStream stream = log.getStream();
095    try {
096      boolean hasMore = true;
097      while (hasMore) {
098        ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream);
099        if (entry == null) {
100          LOG.warn("Nothing left to decode. Exiting with missing EOF, log={}", log);
101          break;
102        }
103        count++;
104        switch (entry.getType()) {
105          case PROCEDURE_WAL_INIT:
106            readInitEntry(entry);
107            break;
108          case PROCEDURE_WAL_INSERT:
109            readInsertEntry(entry);
110            break;
111          case PROCEDURE_WAL_UPDATE:
112          case PROCEDURE_WAL_COMPACT:
113            readUpdateEntry(entry);
114            break;
115          case PROCEDURE_WAL_DELETE:
116            readDeleteEntry(entry);
117            break;
118          case PROCEDURE_WAL_EOF:
119            hasMore = false;
120            break;
121          default:
122            throw new CorruptedWALProcedureStoreException("Invalid entry: " + entry);
123        }
124      }
125      LOG.info("Read {} entries in {}", count, log);
126    } catch (InvalidProtocolBufferException e) {
127      LOG.error("While reading entry #{} in {}", count, log, e);
128      loader.markCorruptedWAL(log, e);
129    }
130
131    if (!localProcedureMap.isEmpty()) {
132      log.setProcIds(localProcedureMap.getMinModifiedProcId(),
133        localProcedureMap.getMaxModifiedProcId());
134      if (localTracker.isPartial()) {
135        localTracker.setMinMaxModifiedProcIds(localProcedureMap.getMinModifiedProcId(),
136          localProcedureMap.getMaxModifiedProcId());
137      }
138      procedureMap.merge(localProcedureMap);
139    }
140    // Do not reset the partial flag for local tracker, as here the local tracker only know the
141    // procedures which are modified in this file.
142  }
143
144  public void finish() throws IOException {
145    // notify the loader about the max proc ID
146    loader.setMaxProcId(maxProcId);
147
148    // build the procedure execution tree. When building we will verify that whether a procedure is
149    // valid.
150    ProcedureTree tree = ProcedureTree.build(procedureMap.getProcedures());
151    loader.load(tree.getValidProcs());
152    loader.handleCorrupted(tree.getCorruptedProcs());
153  }
154
155  private void setDeletedIfPartial(ProcedureStoreTracker tracker, long procId) {
156    if (tracker.isPartial()) {
157      tracker.setDeleted(procId, true);
158    }
159  }
160
161  private void insertIfPartial(ProcedureStoreTracker tracker, ProcedureProtos.Procedure proc) {
162    if (tracker.isPartial()) {
163      tracker.insert(proc.getProcId());
164    }
165  }
166
167  private void loadProcedure(ProcedureWALEntry entry, ProcedureProtos.Procedure proc) {
168    maxProcId = Math.max(maxProcId, proc.getProcId());
169    if (isRequired(proc.getProcId())) {
170      LOG.trace("Read {} entry {}", entry.getType(), proc.getProcId());
171      localProcedureMap.add(proc);
172      insertIfPartial(tracker, proc);
173    }
174    insertIfPartial(localTracker, proc);
175  }
176
177  private void readInitEntry(ProcedureWALEntry entry) {
178    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
179    loadProcedure(entry, entry.getProcedure(0));
180  }
181
182  private void readInsertEntry(ProcedureWALEntry entry) {
183    assert entry.getProcedureCount() >= 1 : "Expected one or more procedures";
184    loadProcedure(entry, entry.getProcedure(0));
185    for (int i = 1; i < entry.getProcedureCount(); ++i) {
186      loadProcedure(entry, entry.getProcedure(i));
187    }
188  }
189
190  private void readUpdateEntry(ProcedureWALEntry entry) {
191    assert entry.getProcedureCount() == 1 : "Expected only one procedure";
192    loadProcedure(entry, entry.getProcedure(0));
193  }
194
195  private void readDeleteEntry(ProcedureWALEntry entry) {
196    assert entry.hasProcId() : "expected ProcID";
197
198    if (entry.getChildIdCount() > 0) {
199      assert entry.getProcedureCount() == 1 : "Expected only one procedure";
200
201      // update the parent procedure
202      loadProcedure(entry, entry.getProcedure(0));
203
204      // remove the child procedures of entry.getProcId()
205      for (int i = 0, count = entry.getChildIdCount(); i < count; ++i) {
206        deleteEntry(entry.getChildId(i));
207      }
208    } else {
209      assert entry.getProcedureCount() == 0 : "Expected no procedures";
210
211      // delete the procedure
212      deleteEntry(entry.getProcId());
213    }
214  }
215
216  private void deleteEntry(final long procId) {
217    LOG.trace("delete entry {}", procId);
218    maxProcId = Math.max(maxProcId, procId);
219    localProcedureMap.remove(procId);
220    assert !procedureMap.contains(procId);
221    setDeletedIfPartial(tracker, procId);
222    setDeletedIfPartial(localTracker, procId);
223  }
224
225  private boolean isDeleted(long procId) {
226    return tracker.isDeleted(procId) == ProcedureStoreTracker.DeleteState.YES;
227  }
228
229  private boolean isRequired(long procId) {
230    return !isDeleted(procId) && !procedureMap.contains(procId);
231  }
232}