/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2018-2023 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples; import com.unboundid.directory.sdk.common.types.LogSeverity; import com.unboundid.directory.sdk.sync.api.ChangeDetector; import com.unboundid.directory.sdk.sync.types.ChangeRecord; import com.unboundid.directory.sdk.sync.types.EndpointException; import com.unboundid.directory.sdk.sync.types.PostStepResult; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.directory.sdk.sync.types.SyncSourceContext; import com.unboundid.ldap.sdk.ChangeType; import com.unboundid.ldap.sdk.DN; import com.unboundid.ldap.sdk.LDAPException; import com.unboundid.ldap.sdk.Modification; import com.unboundid.ldif.LDIFChangeRecord; import com.unboundid.ldif.LDIFException; import com.unboundid.ldif.LDIFModifyChangeRecord; import com.unboundid.ldif.LDIFModifyDNChangeRecord; import com.unboundid.ldif.LDIFReader; import com.unboundid.util.ObjectPair; import com.unboundid.util.StaticUtils; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.DNArgument; import com.unboundid.util.args.FileArgument; import java.io.File; import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import static java.nio.file.StandardCopyOption.REPLACE_EXISTING; /** * This class provides a simple example of an LDAP Change Detector which * uses an audit log from a PingDirectory instance as the source for * changes. More specifically, the extension will look for any files that begin * with "audit", and process them using an {@code LDIFReader}. This * extension assumes that the log files contain valid LDIF change records. * Just as with cn=changelog, the changes found by the Change Detector * are only used as hints about what has changed. The full entry is always * fetched from the source prior to calculating any changes for the destination. * <p> * The following arguments are defined: * <UL> * <LI>log-files-dir -- The filesystem directory to monitor for log files</LI> * <LI>processed-files-dir -- The filesystem directory where finished log * files should be moved</LI> * <LI>base-dn -- The base DN to use when creating LDAP entries from the log * file content</LI> * </UL> */ public final class ExampleChangeDetector extends ChangeDetector { // The server context which can be used for obtaining the server state, // logging, etc. private SyncServerContext serverContext; // An LDIF Processor. private LDIFProcessor ldifProcessor = null; // A set of files that have been completely detected by getNextBatchOfChanges // (e.g. all rows have been read and returned). This is a map so that we know // how many records were in each file. private final Map<File, Long> pendingFiles = new ConcurrentHashMap<File, Long>(); // The directory which we will monitor for new log files. private File logFilesDir; // The directory where we will move files after they have been processed. private File processedFilesDir; // The base DN to use for entries in the log files. private DN baseDN; // The file filter used to match log files in the LogDirectory. private FileFilter logFilter; // A Comparator to sort the files from oldest to newest. private Comparator<File> oldestToNewestComparator; private AtomicLong changeNumber = new AtomicLong(0); // A bookmark to keep position in the current file. private ObjectPair<File, Long> currentState; // The attachment key used to place the log filename on the ChangeRecord // so that acknowledgeCompletedOps() can reference it. private static final String FILE_ATTACHMENT_KEY = "File.Attachment"; // The attachment key used to place the change number on // the ChangeRecord when it is built. private static final String LOG_CHANGE_NUMBER_KEY = "Log.ChangeNumber"; /** * Retrieves a human-readable name for this extension. * * @return A human-readable name for this extension. */ @Override public String getExtensionName() { return "Example LDAP Change Detector"; } /** * Retrieves a human-readable description for this extension. Each element * of the array that is returned will be considered a separate paragraph in * generated documentation. * * @return A human-readable description for this extension, or {@code null} * or an empty array if no description should be available. */ @Override public String[] getExtensionDescription() { return new String[]{ "This extension implements a Change Detector which " + "can be used to detect changes from a set of audit log " + "files in a given directory. The extension will look for " + "any files starting with 'audit' and interpret each " + "entry as a change to be processed. After a file has " + "been completely processed, it will be moved to a " + "separate directory." }; } /** * Updates the provided argument parser to define any configuration arguments * which may be used by this extension. The argument parser may also be * updated to define relationships between arguments (e.g. to specify * required, exclusive, or dependent argument sets). * * @param parser The argument parser to be updated with the configuration * arguments which may be used by this extension. * @throws ArgumentException If a problem is encountered while updating the * provided argument parser. */ @Override public void defineConfigArguments(final ArgumentParser parser) throws ArgumentException { FileArgument newFilesArg = new FileArgument( 'd', "log-files-dir", true, 1, "{path}", "The file system directory to monitor for log files", true, true, false, true); parser.addArgument(newFilesArg); FileArgument processedFilesArg = new FileArgument( 'p', "processed-files-dir", true, 1, "{path}", "The file system directory where the extension should " + "move log files after it finishes processing them", true, true, false, true); parser.addArgument(processedFilesArg); DNArgument baseDNArg = new DNArgument('b', "base-dn", true, 1, "{base-dn}", "The base DN to use for entries in the log files"); parser.addArgument(baseDNArg); } /** * This hook is called when a Sync Pipe first starts up or when the * set-startpoint subcommand is called from the <i>realtime-sync</i> command * line tool. Any initialization of this Change Detector should be performed * here. This method should generally store both the {@link SyncServerContext} * and the {@link SyncSourceContext} in a class member so that it can be used * elsewhere in the implementation. * * @param serverContext A handle to the server context for the server in * which this extension is running. * @param syncSourceContext An interface for interacting with the Sync * Source that owns this Change Detector. * @param parser The argument parser which has been initialized * from the configuration for this Change Detector. */ @Override public void initializeChangeDetector( final SyncServerContext serverContext, final SyncSourceContext syncSourceContext, final ArgumentParser parser) { this.serverContext = serverContext; logFilter = new FileFilter() { @Override public boolean accept(final File pathname) { return pathname.isFile() && pathname.canRead() && pathname.getName().startsWith("audit"); } }; oldestToNewestComparator = new Comparator<File>() { public int compare(final File f1, final File f2) { if (f1.lastModified() - f2.lastModified() > 0) { return 1; } else if (f1.lastModified() - f2.lastModified() < 0) { return -1; } else { return 0; } } }; FileArgument newFilesArg = (FileArgument) parser.getNamedArgument("log-files-dir"); logFilesDir = newFilesArg.getValue(); debugInfo("Watching folder for logs: " + logFilesDir.getAbsolutePath()); FileArgument processedFilesArg = (FileArgument) parser.getNamedArgument("processed-files-dir"); processedFilesDir = processedFilesArg.getValue(); debugInfo("Moving processed logs to: " + processedFilesDir.getAbsolutePath()); DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn"); baseDN = baseDNArg.getValue(); debugInfo("Ignoring changes outside the base dn: " + baseDN.toNormalizedString()); } /** * This method should effectively set the starting point for synchronization * to the place specified by the <code>options</code> parameter. This should * cause all changes previous to the specified start point to be disregarded * and only changes after that point to be returned by * {@link #getNextBatchOfChanges(int, AtomicLong)}. * <p> * There are several different startpoint types (see * {@link SetStartpointOptions}), and this implementation is not required to * support them all. If the specified startpoint type is unsupported, this * method should throw an {@link UnsupportedOperationException}. * * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type * must be supported by your implementation, because this is used when a Sync * Pipe first starts up. The {@link Serializable} in this case is the same * type that is returned by {@link #getStartpoint()}; the Sync Server persists * it and passes it back in on a restart. * <p> * This method can be called from two different contexts: * <ul> * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used * (the Sync Pipe is required to be stopped in this context)</li> * <li>Immediately after a Sync Pipe starts up and a connection is first * established to the source server (e.g. before the first call to * {@link #getNextBatchOfChanges(int, AtomicLong)})</li> * </ul> * * @param options an object which indicates where exactly to start * synchronizing (e.g. the end of the changelog, specific * change number, a certain time ago, etc) * @throws EndpointException if there is any error while setting the * start point */ @Override public void setStartpoint(final SetStartpointOptions options) throws EndpointException { File[] files; switch (options.getStartpointType()) { case BEGINNING_OF_CHANGELOG: debugInfo("Setting startpoint to beginning of the changelog."); files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } // Sort the files from oldest to newest Arrays.sort(files, oldestToNewestComparator); // Set the bookmark to the beginning of the first file currentState = new ObjectPair<>(files[0], 0L); // Initialize the LDIFReader initializeLDIFProcessor(files[0], 0); // Set our incrementing change number back to 0 changeNumber.set(0); break; case END_OF_CHANGELOG: debugInfo("Setting startpoint to end of changelog."); // Move all files to the processed files directory files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } for (File f : files) { moveCompletedFile(f); } // Wipe out the current state of the LDIF Reader if (ldifProcessor != null) { try { ldifProcessor.close(); } catch (IOException e) { serverContext.logMessage(LogSeverity.MILD_ERROR, e.getLocalizedMessage()); } finally { ldifProcessor = null; // Clear the bookmark currentState = null; } } break; case RESUME_AT_CHANGE_TIME: long time = options.getChangeTime().getTime(); debugInfo("Setting startpoint to resume at time " + time); files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } Arrays.sort(files, oldestToNewestComparator); for (File f : files) { if (f.lastModified() < time) { moveCompletedFile(f); } else { // Update the bookmark to the file nearest the specified time currentState = new ObjectPair<>(f, 0L); break; } } if (ldifProcessor != null) { try { ldifProcessor.close(); } catch (IOException e) { ldifProcessor = null; serverContext.logMessage(LogSeverity.MILD_ERROR, e.getLocalizedMessage()); } finally { // This will only be an approximation as the previous file // may contain changes that occurred after the specified // resume time. initializeLDIFProcessor(currentState); } } break; case RESUME_AT_SERIALIZABLE: debugInfo("Setting startpoint to resume at serialized state."); if (currentState == null) { // If there is no bookmark, start from the beginning files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } // Sort the files from oldest to newest Arrays.sort(files, oldestToNewestComparator); currentState = new ObjectPair<>(files[0], 0L); changeNumber.set(0); } else { // Get the current bookmark from the options Serializable token = options.getSerializableValue(); if (token != null) { ChangeDetectorState state = (ChangeDetectorState) token; currentState = new ObjectPair<>(state.getFile(), state.getPosition()); } } initializeLDIFProcessor(currentState); break; default: throw new IllegalArgumentException( "This startpoint type is not supported: " + options.getStartpointType().toString()); } } /** * Gets the current value of the startpoint for change detection. This is the * "bookmark" which indicates which changes have already been processed and * which have not. In most cases, a change number is used to detect changes * and is managed by the Data Sync Server, in which case this * implementation needs only to return the latest acknowledged * change number. In other cases, the return value may correspond to a * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. * In any case, this method should return the value that is updated by * {@link #acknowledgeCompletedOps(LinkedList)}. * <p> * This method is called periodically and the return value is saved in the * persistent state for the Sync Pipe that uses this extension as a Change * Detector. * * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * {@link #acknowledgeCompletedOps(LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. * * @return a value to store in the persistent state for the Sync Pipe. This is * usually a change number, but if a changelog table is not used to * detect changes, this value should represent some other token to * pass into {@link #setStartpoint(SetStartpointOptions)} * when the sync pipe starts up. */ @Override public Serializable getStartpoint() { // As the bookmark contains both the file and change number it contains // enough information to be a useful startpoint. return ChangeDetectorState.createState(currentState); } /** * Return the next batch of change records from the source. Change records * are usually just hints that a change happened; they do not include * the full contents of the target entry. In an effort to never synchronize * stale data, the Data Sync Server will go back and fetch the full * target entry for each change record. * <p> * On the first invocation, this should return changes starting from the * startpoint that was set by * {@link #setStartpoint(SetStartpointOptions)}. This method is also * responsible for updating the internal state such that subsequent * invocations do not return duplicate changes. * <p> * The resulting list should be limited by <code>maxChanges</code>. The * <code>numStillPending</code> reference should be set to the estimated * number of changes that haven't yet been retrieved from the source endpoint * when this method returns, or zero if all the current changes have been * retrieved. * * <b>IMPORTANT</b>: While this method needs to keep track of which changes * have already been returned so that it does not return them again, it should * <b>NOT</b> modify the official startpoint. The internal value for the * startpoint should only be updated after a sync operation is acknowledged * back to this extension (via * {@link #acknowledgeCompletedOps(LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. The * startpoint should not change as a result of this method. * <p> * This method <b>does not need to be thread-safe</b>. It will be invoked * repeatedly by a single thread, based on the polling interval set in the * Sync Pipe configuration. * * @param maxChanges the maximum number of changes to retrieve * @param numStillPending this should be set to the number of unretrieved * changes that are still pending after this batch * has been retrieved. This will be passed in as zero, * and may be left that way if the actual value cannot * be determined. * @return a list of {@link ChangeRecord} instances, each * corresponding to a single change at the source endpoint. * If there are no new changes to return, this method should return * an empty list. * @throws EndpointException if there is any error while retrieving the * next batch of changes */ @Override public List<ChangeRecord> getNextBatchOfChanges( final int maxChanges, final AtomicLong numStillPending) throws EndpointException { List<ChangeRecord> results = new ArrayList<ChangeRecord>(); boolean morePending = false; File currentFile = currentState == null ? null : currentState.getFirst(); long currentFilePosition = currentState == null ? 0L : currentState.getSecond(); try { if (ldifProcessor != null) { // We still have an unexhausted LDIFProcessor, keep processing morePending = processChanges(results, maxChanges); if (!morePending) { // We've exhausted the current LDIFProcessor. We need to move // the current file and position into pendingFiles so that we // don't try to process the same file again. pendingFiles.put(ldifProcessor.getCurrentState().getFirst(), ldifProcessor.getCurrentState().getSecond()); } } // If we haven't reached the limit for the number of results to // return we will continue processing. if (results.size() < maxChanges) { File[] files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } // Nothing to process if (files.length < 1) { return results; } // Sort the files from oldest to newest Arrays.sort(files, oldestToNewestComparator); for (File file : files) { // Keep track of which file we're currently working on if (pendingFiles.containsKey(file) || !file.exists()) { // File has already been processed. continue; } // Initialize the LDIFProcessor with the current file // and start at the beginning. initializeLDIFProcessor(file, 0); if (ldifProcessor == null) { return results; } // Start processing morePending = processChanges(results, maxChanges); // Keep track of our current file and position for later processing. currentFile = ldifProcessor.getCurrentState().getFirst(); currentFilePosition = ldifProcessor.getCurrentState().getSecond(); if (results.size() >= maxChanges) { // Don't look at any more files if we've reached the maximum number // of changes for this batch. break; } } } } catch (IOException | LDIFException e) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Problem with LDIFProcessor: " + StaticUtils.getExceptionMessage(e)); } catch (LDAPException e) { throw new EndpointException(e); } if (!morePending) { // We've processed all the currently held files. currentState = null; if (currentFile != null) { pendingFiles.put(currentFile, currentFilePosition); } if (ldifProcessor != null) { try { ldifProcessor.close(); } catch (IOException e) { serverContext.logMessage(LogSeverity.MILD_ERROR, e.getLocalizedMessage()); } finally { ldifProcessor = null; } } } else { // Need to indicate that there are more changes to get. numStillPending.set(1); // Update our bookmark currentState = new ObjectPair<>(currentFile, currentFilePosition); } return results; } /** * Processes changes from audit log files and returns them as a list of * {@code ChangeRecord}s. * * @param results The list of ChangeRecords. * @param maxChanges The maximum number of changes to process. * * @return True if more changes to process in current file. * * @throws IOException If unable to read file. * @throws LDIFException If error processing the file as LDIF records. * @throws LDAPException If unable to parse the DN from a change record. */ private boolean processChanges(final List<ChangeRecord> results, final int maxChanges) throws IOException, LDIFException, LDAPException { LDIFChangeRecord ldifChangeRecord; // Iterate through all of the change records in the current file while (ldifProcessor.hasNext()) { ldifChangeRecord = ldifProcessor.next(); ObjectPair<File, Long> processorState = ldifProcessor.getCurrentState(); // Skip ahead until we get to the stated position in currentState. if (currentState != null && currentState.getSecond() > processorState.getSecond()) { continue; } if (baseDN.isAncestorOf(ldifChangeRecord.getParsedDN(), false)) { ChangeRecord.Builder bldr = new ChangeRecord.Builder( ldifChangeRecord.getChangeType(), ldifChangeRecord.getParsedDN()); // If the change is a moddn then set the identifiableInfoAfterChange if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY_DN) { DN dnAfterChange = ((LDIFModifyDNChangeRecord) ldifChangeRecord).getNewDN(); bldr.identifiableInfoAfterChange(dnAfterChange); bldr.changedAttributes(dnAfterChange.getRDN().getAttributeNames()); } // If the change is a modification add all of the changed attribute // names to the ChangeRecord. if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY) { Modification[] mods = ((LDIFModifyChangeRecord) ldifChangeRecord) .getModifications(); List<String> changedAttrNames = new ArrayList<>(); for (Modification mod : mods) { changedAttrNames.add(mod.getAttributeName()); } bldr.changedAttributes(changedAttrNames.toArray(new String[0])); } // This example is using the system time as an approximation of // the actual change time. If a true change time is available it // is best to use that. bldr.changeTime(System.currentTimeMillis()); bldr.changeNumber(changeNumber.getAndAdd(1)); // Attach the current file and file position so that we can // update the internal state in inOrderAcknowledge. bldr.addProperty(FILE_ATTACHMENT_KEY, processorState.getFirst()); bldr.addProperty(LOG_CHANGE_NUMBER_KEY, processorState.getSecond()); ChangeRecord change = bldr.build(); serverContext.debugInfo("Found change: " + change.toString()); results.add(change); if (results.size() >= maxChanges) { // Don't look at any more rows in this file if we've reached the // maximum number of changes for this batch. return true; } } else { serverContext.logMessage(LogSeverity.DEBUG, "Ignoring out-of-scope change: " + ldifChangeRecord.getDN()); } } return false; } /** * Provides a way for the Data Sync Server to acknowledge back to the * extension which sync operations it has processed. This method should update * the official startpoint which was set by * {@link #setStartpoint(SetStartpointOptions)} and is * returned by {@link #getStartpoint()}. * * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * this method). Otherwise it will be possible for changes to be missed when * the Data Sync Server is restarted or a connection error occurs. * * @param completedOps a list of {@link SyncOperation}s that have finished * processing. The records are listed in the order they * were first detected. * @throws EndpointException if there is an error acknowledging the changes * back to the source */ @Override public void acknowledgeCompletedOps( final LinkedList<SyncOperation> completedOps) throws EndpointException { ChangeRecord lastRecord = completedOps.getLast().getChangeRecord(); File sourceFile = (File) lastRecord.getProperty(FILE_ATTACHMENT_KEY); Long logChangeNumber = (Long) lastRecord.getProperty(LOG_CHANGE_NUMBER_KEY); File[] files = logFilesDir.listFiles(logFilter); if (files == null) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Unable to open log file directory " + logFilesDir.getAbsolutePath()); } Arrays.sort(files, oldestToNewestComparator); for (File f : files) { if (f.lastModified() < sourceFile.lastModified() && pendingFiles.containsKey(f)) { moveCompletedFile(f); pendingFiles.remove(f); } } if (pendingFiles.get(sourceFile) == logChangeNumber) { moveCompletedFile(sourceFile); pendingFiles.remove(sourceFile); } } /** * Moves a file that we have finished processing to the processed files * directory and deletes the original file. * * @param srcFile the file to move * @throws EndpointException if there is a problem moving the file */ private void moveCompletedFile(final File srcFile) throws EndpointException { final File destFile = new File(processedFilesDir, srcFile.getName()); try { debugInfo("Moving file " + srcFile.getAbsolutePath() + " to " + destFile.getAbsolutePath()); Files.move(srcFile.toPath(), destFile.toPath(), REPLACE_EXISTING); } catch (Exception e) { throw new EndpointException(PostStepResult.ABORT_OPERATION, "Could not move " + srcFile.getAbsolutePath() + " to " + destFile.getAbsolutePath(), e); } } /** * Initializes an {@code LDIFProcessor} using the current state. * * @param file The file to initialize the LDIFProcessor. * @param startingPosition The zero indexed change from which to begin * processing. * @throws EndpointException if the LDIFProcessor cannot be successfully * initialized. */ private void initializeLDIFProcessor(final File file, final long startingPosition) throws EndpointException { if (ldifProcessor != null) { try { ldifProcessor.close(); } catch (IOException e) { serverContext.logMessage(LogSeverity.MILD_ERROR, "Unable to close LDIFProcessor. " + e.getLocalizedMessage()); } } try { debugInfo("Initializing LDIFProcessor with " + file.getAbsolutePath() + " starting at change " + startingPosition); ldifProcessor = new LDIFProcessor(file, startingPosition); } catch (IOException | LDIFException e) { throw new EndpointException( PostStepResult.ABORT_OPERATION, "Unable to initialize LDIFProcessor! " + e.getLocalizedMessage()); } } /** * Initializes an {@code LDIFProcessor} using the current state. * * @param state An ObjectPair containing a File and position information. * @throws EndpointException if the LDIFProcessor cannot be successfully * initialized. */ private void initializeLDIFProcessor(final ObjectPair<File, Long> state) throws EndpointException { initializeLDIFProcessor(state.getFirst(), state.getSecond()); } /** * Wrapper for logging debug messages. * * @param message The message to log. */ private void debugInfo(final String message) { if (serverContext != null) { serverContext.debugInfo(message); } else { System.out.println(message); } } /** * A Serializable Wrapper class to contain the state stored in * {@code currentState}. This is used to retrieve the Change Detector's * state when resuming from being shut down. */ static final class ChangeDetectorState implements Serializable { private static final long serialVersionUID = -6088093761035832413L; private final File file; private final long position; /** * Private Constructor to prevent instantiation. * * @param file File part of state. * @param position Change position within the file. */ private ChangeDetectorState(final File file, final long position) { this.file = file; this.position = position; } /** * Creates a ChangeDetectorState from an {@code ObjectPair<File, Long>}. * * @param state The current state. * @return The resulting ChangeDetectorState. */ static ChangeDetectorState createState(final ObjectPair<File, Long> state) { return new ChangeDetectorState(state.getFirst(), state.getSecond()); } /** * Get the file portion of the state. * * @return a file. */ File getFile() { return file; } /** * Get the position portion of the state. * * @return the position. */ long getPosition() { return position; } } /** * Utility class to manage an LDIFReader and its position in the file * currently being processed. */ static final class LDIFProcessor { private final LDIFReader ldifReader; private final File file; // Tracks how many changes have been // processed in the current file. private long currentPosition; private LDIFChangeRecord currentRecord; private LDIFChangeRecord nextRecord; /** * Constructs a new LDIFProcessor. * * @param file The file to process. * @param startingPosition The desired starting position. * * @throws IOException If unable to open the file. * @throws LDIFException If unable to read changes as LDIF records. */ LDIFProcessor(final File file, final long startingPosition) throws IOException, LDIFException { this.file = file; this.ldifReader = new LDIFReader(file); this.currentPosition = startingPosition; // If starting later in the file discard until the desired change. if (startingPosition > 0) { for (int i = 0; i < currentPosition; i++) { ldifReader.readChangeRecord(); } } currentRecord = ldifReader.readChangeRecord(true); nextRecord = ldifReader.readChangeRecord(true); } /** * Gets the current state of this LDIFProcessor. * * @return An ObjectPair containing the current file and position. */ ObjectPair<File, Long> getCurrentState() { return new ObjectPair<>(file, currentRecord == null ? currentPosition-- : currentPosition); } /** * Close the LDIFProcessor. * * @throws IOException If error closing LDIFReader. */ void close() throws IOException { ldifReader.close(); } /** * Returns true if the LDIFProcessor has more changes. * * @return True if more changes. */ boolean hasNext() { return currentRecord != null; } /** * Get next LDIFChangeRecord. * * @return The next change record or null. * * @throws IOException If error reading from file. * @throws LDIFException If error parsing file as LDIF. */ LDIFChangeRecord next() throws IOException, LDIFException { final LDIFChangeRecord toReturn = currentRecord; currentRecord = nextRecord; if (currentRecord != null) { nextRecord = ldifReader.readChangeRecord(true); } currentPosition++; return toReturn; } } }