() {
public int compare(final File f1, final File f2) {
if (f1.lastModified() - f2.lastModified() > 0) {
return 1;
} else if (f1.lastModified() - f2.lastModified() < 0) {
return -1;
} else {
return 0;
}
}
};
FileArgument newFilesArg =
(FileArgument) parser.getNamedArgument("log-files-dir");
logFilesDir = newFilesArg.getValue();
debugInfo("Watching folder for logs: "
+ logFilesDir.getAbsolutePath());
FileArgument processedFilesArg =
(FileArgument) parser.getNamedArgument("processed-files-dir");
processedFilesDir = processedFilesArg.getValue();
debugInfo("Moving processed logs to: "
+ processedFilesDir.getAbsolutePath());
DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn");
baseDN = baseDNArg.getValue();
debugInfo("Ignoring changes outside the base dn: "
+ baseDN.toNormalizedString());
}
/**
* This method should effectively set the starting point for synchronization
* to the place specified by the options
parameter. This should
* cause all changes previous to the specified start point to be disregarded
* and only changes after that point to be returned by
* {@link #getNextBatchOfChanges(int, AtomicLong)}.
*
* There are several different startpoint types (see
* {@link SetStartpointOptions}), and this implementation is not required to
* support them all. If the specified startpoint type is unsupported, this
* method should throw an {@link UnsupportedOperationException}.
*
* IMPORTANT: The RESUME_AT_SERIALIZABLE
startpoint type
* must be supported by your implementation, because this is used when a Sync
* Pipe first starts up. The {@link Serializable} in this case is the same
* type that is returned by {@link #getStartpoint()}; the Sync Server persists
* it and passes it back in on a restart.
*
* This method can be called from two different contexts:
*
* - When the 'set-startpoint' subcommand of the realtime-sync CLI is used
* (the Sync Pipe is required to be stopped in this context)
* - Immediately after a Sync Pipe starts up and a connection is first
* established to the source server (e.g. before the first call to
* {@link #getNextBatchOfChanges(int, AtomicLong)})
*
*
* @param options an object which indicates where exactly to start
* synchronizing (e.g. the end of the changelog, specific
* change number, a certain time ago, etc)
* @throws EndpointException if there is any error while setting the
* start point
*/
@Override
public void setStartpoint(final SetStartpointOptions options)
throws EndpointException {
File[] files;
switch (options.getStartpointType()) {
case BEGINNING_OF_CHANGELOG:
debugInfo("Setting startpoint to beginning of the changelog.");
files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Unable to open log file directory "
+ logFilesDir.getAbsolutePath());
}
// Sort the files from oldest to newest
Arrays.sort(files, oldestToNewestComparator);
// Set the bookmark to the beginning of the first file
currentState = new ObjectPair<>(files[0], 0L);
// Initialize the LDIFReader
initializeLDIFProcessor(files[0], 0);
// Set our incrementing change number back to 0
changeNumber.set(0);
break;
case END_OF_CHANGELOG:
debugInfo("Setting startpoint to end of changelog.");
// Move all files to the processed files directory
files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Unable to open log file directory " +
logFilesDir.getAbsolutePath());
}
for (File f : files) {
moveCompletedFile(f);
}
// Wipe out the current state of the LDIF Reader
if (ldifProcessor != null) {
try {
ldifProcessor.close();
} catch (IOException e) {
serverContext.logMessage(LogSeverity.MILD_ERROR,
e.getLocalizedMessage());
} finally {
ldifProcessor = null;
// Clear the bookmark
currentState = null;
}
}
break;
case RESUME_AT_CHANGE_TIME:
long time = options.getChangeTime().getTime();
debugInfo("Setting startpoint to resume at time " + time);
files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open log file directory " +
logFilesDir.getAbsolutePath());
}
Arrays.sort(files, oldestToNewestComparator);
for (File f : files) {
if (f.lastModified() < time) {
moveCompletedFile(f);
} else {
// Update the bookmark to the file nearest the specified time
currentState = new ObjectPair<>(f, 0L);
break;
}
}
if (ldifProcessor != null) {
try {
ldifProcessor.close();
} catch (IOException e) {
ldifProcessor = null;
serverContext.logMessage(LogSeverity.MILD_ERROR,
e.getLocalizedMessage());
} finally {
// This will only be an approximation as the previous file
// may contain changes that occurred after the specified
// resume time.
initializeLDIFProcessor(currentState);
}
}
break;
case RESUME_AT_SERIALIZABLE:
debugInfo("Setting startpoint to resume at serialized state.");
if (currentState == null) {
// If there is no bookmark, start from the beginning
files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Unable to open log file directory "
+ logFilesDir.getAbsolutePath());
}
// Sort the files from oldest to newest
Arrays.sort(files, oldestToNewestComparator);
currentState = new ObjectPair<>(files[0], 0L);
changeNumber.set(0);
} else {
// Get the current bookmark from the options
Serializable token = options.getSerializableValue();
if (token != null) {
ChangeDetectorState state =
(ChangeDetectorState) token;
currentState =
new ObjectPair<>(state.getFile(), state.getPosition());
}
}
initializeLDIFProcessor(currentState);
break;
default:
throw new IllegalArgumentException(
"This startpoint type is not supported: " +
options.getStartpointType().toString());
}
}
/**
* Gets the current value of the startpoint for change detection. This is the
* "bookmark" which indicates which changes have already been processed and
* which have not. In most cases, a change number is used to detect changes
* and is managed by the Data Sync Server, in which case this
* implementation needs only to return the latest acknowledged
* change number. In other cases, the return value may correspond to a
* different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
* In any case, this method should return the value that is updated by
* {@link #acknowledgeCompletedOps(LinkedList)}.
*
* This method is called periodically and the return value is saved in the
* persistent state for the Sync Pipe that uses this extension as a Change
* Detector.
*
* IMPORTANT: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this extension (via
* {@link #acknowledgeCompletedOps(LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs.
*
* @return a value to store in the persistent state for the Sync Pipe. This is
* usually a change number, but if a changelog table is not used to
* detect changes, this value should represent some other token to
* pass into {@link #setStartpoint(SetStartpointOptions)}
* when the sync pipe starts up.
*/
@Override
public Serializable getStartpoint() {
// As the bookmark contains both the file and change number it contains
// enough information to be a useful startpoint.
return ChangeDetectorState.createState(currentState);
}
/**
* Return the next batch of change records from the source. Change records
* are usually just hints that a change happened; they do not include
* the full contents of the target entry. In an effort to never synchronize
* stale data, the Data Sync Server will go back and fetch the full
* target entry for each change record.
*
* On the first invocation, this should return changes starting from the
* startpoint that was set by
* {@link #setStartpoint(SetStartpointOptions)}. This method is also
* responsible for updating the internal state such that subsequent
* invocations do not return duplicate changes.
*
* The resulting list should be limited by maxChanges
. The
* numStillPending
reference should be set to the estimated
* number of changes that haven't yet been retrieved from the source endpoint
* when this method returns, or zero if all the current changes have been
* retrieved.
*
* IMPORTANT: While this method needs to keep track of which changes
* have already been returned so that it does not return them again, it should
* NOT modify the official startpoint. The internal value for the
* startpoint should only be updated after a sync operation is acknowledged
* back to this extension (via
* {@link #acknowledgeCompletedOps(LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs. The
* startpoint should not change as a result of this method.
*
* This method does not need to be thread-safe. It will be invoked
* repeatedly by a single thread, based on the polling interval set in the
* Sync Pipe configuration.
*
* @param maxChanges the maximum number of changes to retrieve
* @param numStillPending this should be set to the number of unretrieved
* changes that are still pending after this batch
* has been retrieved. This will be passed in as zero,
* and may be left that way if the actual value cannot
* be determined.
* @return a list of {@link ChangeRecord} instances, each
* corresponding to a single change at the source endpoint.
* If there are no new changes to return, this method should return
* an empty list.
* @throws EndpointException if there is any error while retrieving the
* next batch of changes
*/
@Override
public List getNextBatchOfChanges(
final int maxChanges,
final AtomicLong numStillPending)
throws EndpointException {
List results = new ArrayList();
boolean morePending = false;
File currentFile = currentState == null ? null : currentState.getFirst();
long currentFilePosition =
currentState == null ? 0L : currentState.getSecond();
try {
if (ldifProcessor != null) {
// We still have an unexhausted LDIFProcessor, keep processing
morePending = processChanges(results, maxChanges);
if (!morePending) {
// We've exhausted the current LDIFProcessor. We need to move
// the current file and position into pendingFiles so that we
// don't try to process the same file again.
pendingFiles.put(ldifProcessor.getCurrentState().getFirst(),
ldifProcessor.getCurrentState().getSecond());
}
}
// If we haven't reached the limit for the number of results to
// return we will continue processing.
if (results.size() < maxChanges) {
File[] files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Unable to open log file directory "
+ logFilesDir.getAbsolutePath());
}
// Nothing to process
if (files.length < 1) {
return results;
}
// Sort the files from oldest to newest
Arrays.sort(files, oldestToNewestComparator);
for (File file : files) {
// Keep track of which file we're currently working on
if (pendingFiles.containsKey(file) || !file.exists()) {
// File has already been processed.
continue;
}
// Initialize the LDIFProcessor with the current file
// and start at the beginning.
initializeLDIFProcessor(file, 0);
if (ldifProcessor == null) {
return results;
}
// Start processing
morePending = processChanges(results, maxChanges);
// Keep track of our current file and position for later processing.
currentFile = ldifProcessor.getCurrentState().getFirst();
currentFilePosition = ldifProcessor.getCurrentState().getSecond();
if (results.size() >= maxChanges) {
// Don't look at any more files if we've reached the maximum number
// of changes for this batch.
break;
}
}
}
} catch (IOException | LDIFException e) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Problem with LDIFProcessor: "
+ StaticUtils.getExceptionMessage(e));
} catch (LDAPException e) {
throw new EndpointException(e);
}
if (!morePending) {
// We've processed all the currently held files.
currentState = null;
if (currentFile != null) {
pendingFiles.put(currentFile, currentFilePosition);
}
if (ldifProcessor != null) {
try {
ldifProcessor.close();
} catch (IOException e) {
serverContext.logMessage(LogSeverity.MILD_ERROR,
e.getLocalizedMessage());
} finally {
ldifProcessor = null;
}
}
} else {
// Need to indicate that there are more changes to get.
numStillPending.set(1);
// Update our bookmark
currentState = new ObjectPair<>(currentFile, currentFilePosition);
}
return results;
}
/**
* Processes changes from audit log files and returns them as a list of
* {@code ChangeRecord}s.
*
* @param results The list of ChangeRecords.
* @param maxChanges The maximum number of changes to process.
*
* @return True if more changes to process in current file.
*
* @throws IOException If unable to read file.
* @throws LDIFException If error processing the file as LDIF records.
* @throws LDAPException If unable to parse the DN from a change record.
*/
private boolean processChanges(final List results,
final int maxChanges)
throws IOException, LDIFException, LDAPException {
LDIFChangeRecord ldifChangeRecord;
// Iterate through all of the change records in the current file
while (ldifProcessor.hasNext()) {
ldifChangeRecord = ldifProcessor.next();
ObjectPair processorState =
ldifProcessor.getCurrentState();
// Skip ahead until we get to the stated position in currentState.
if (currentState != null &&
currentState.getSecond() > processorState.getSecond()) {
continue;
}
if (baseDN.isAncestorOf(ldifChangeRecord.getParsedDN(), false)) {
ChangeRecord.Builder bldr = new ChangeRecord.Builder(
ldifChangeRecord.getChangeType(),
ldifChangeRecord.getParsedDN());
// If the change is a moddn then set the identifiableInfoAfterChange
if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY_DN) {
DN dnAfterChange =
((LDIFModifyDNChangeRecord) ldifChangeRecord).getNewDN();
bldr.identifiableInfoAfterChange(dnAfterChange);
bldr.changedAttributes(dnAfterChange.getRDN().getAttributeNames());
}
// If the change is a modification add all of the changed attribute
// names to the ChangeRecord.
if (ldifChangeRecord.getChangeType() == ChangeType.MODIFY) {
Modification[] mods =
((LDIFModifyChangeRecord) ldifChangeRecord)
.getModifications();
List changedAttrNames = new ArrayList<>();
for (Modification mod : mods) {
changedAttrNames.add(mod.getAttributeName());
}
bldr.changedAttributes(changedAttrNames.toArray(new String[0]));
}
// This example is using the system time as an approximation of
// the actual change time. If a true change time is available it
// is best to use that.
bldr.changeTime(System.currentTimeMillis());
bldr.changeNumber(changeNumber.getAndAdd(1));
// Attach the current file and file position so that we can
// update the internal state in inOrderAcknowledge.
bldr.addProperty(FILE_ATTACHMENT_KEY, processorState.getFirst());
bldr.addProperty(LOG_CHANGE_NUMBER_KEY, processorState.getSecond());
ChangeRecord change = bldr.build();
serverContext.debugInfo("Found change: " + change.toString());
results.add(change);
if (results.size() >= maxChanges) {
// Don't look at any more rows in this file if we've reached the
// maximum number of changes for this batch.
return true;
}
} else {
serverContext.logMessage(LogSeverity.DEBUG,
"Ignoring out-of-scope change: "
+ ldifChangeRecord.getDN());
}
}
return false;
}
/**
* Provides a way for the Data Sync Server to acknowledge back to the
* extension which sync operations it has processed. This method should update
* the official startpoint which was set by
* {@link #setStartpoint(SetStartpointOptions)} and is
* returned by {@link #getStartpoint()}.
*
* IMPORTANT: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this extension (via
* this method). Otherwise it will be possible for changes to be missed when
* the Data Sync Server is restarted or a connection error occurs.
*
* @param completedOps a list of {@link SyncOperation}s that have finished
* processing. The records are listed in the order they
* were first detected.
* @throws EndpointException if there is an error acknowledging the changes
* back to the source
*/
@Override
public void acknowledgeCompletedOps(
final LinkedList completedOps)
throws EndpointException {
ChangeRecord lastRecord = completedOps.getLast().getChangeRecord();
File sourceFile = (File) lastRecord.getProperty(FILE_ATTACHMENT_KEY);
Long logChangeNumber = (Long) lastRecord.getProperty(LOG_CHANGE_NUMBER_KEY);
File[] files = logFilesDir.listFiles(logFilter);
if (files == null) {
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open log file directory " +
logFilesDir.getAbsolutePath());
}
Arrays.sort(files, oldestToNewestComparator);
for (File f : files) {
if (f.lastModified() < sourceFile.lastModified() &&
pendingFiles.containsKey(f)) {
moveCompletedFile(f);
pendingFiles.remove(f);
}
}
if (pendingFiles.get(sourceFile) == logChangeNumber) {
moveCompletedFile(sourceFile);
pendingFiles.remove(sourceFile);
}
}
/**
* Moves a file that we have finished processing to the processed files
* directory and deletes the original file.
*
* @param srcFile the file to move
* @throws EndpointException if there is a problem moving the file
*/
private void moveCompletedFile(final File srcFile) throws EndpointException {
final File destFile = new File(processedFilesDir, srcFile.getName());
try {
debugInfo("Moving file " + srcFile.getAbsolutePath()
+ " to " + destFile.getAbsolutePath());
Files.move(srcFile.toPath(), destFile.toPath(), REPLACE_EXISTING);
} catch (Exception e) {
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Could not move " + srcFile.getAbsolutePath() +
" to " + destFile.getAbsolutePath(), e);
}
}
/**
* Initializes an {@code LDIFProcessor} using the current state.
*
* @param file The file to initialize the LDIFProcessor.
* @param startingPosition The zero indexed change from which to begin
* processing.
* @throws EndpointException if the LDIFProcessor cannot be successfully
* initialized.
*/
private void initializeLDIFProcessor(final File file,
final long startingPosition)
throws EndpointException {
if (ldifProcessor != null) {
try {
ldifProcessor.close();
} catch (IOException e) {
serverContext.logMessage(LogSeverity.MILD_ERROR,
"Unable to close LDIFProcessor. "
+ e.getLocalizedMessage());
}
}
try {
debugInfo("Initializing LDIFProcessor with " + file.getAbsolutePath()
+ " starting at change " + startingPosition);
ldifProcessor = new LDIFProcessor(file, startingPosition);
} catch (IOException | LDIFException e) {
throw new EndpointException(
PostStepResult.ABORT_OPERATION,
"Unable to initialize LDIFProcessor! " + e.getLocalizedMessage());
}
}
/**
* Initializes an {@code LDIFProcessor} using the current state.
*
* @param state An ObjectPair containing a File and position information.
* @throws EndpointException if the LDIFProcessor cannot be successfully
* initialized.
*/
private void initializeLDIFProcessor(final ObjectPair state)
throws EndpointException {
initializeLDIFProcessor(state.getFirst(), state.getSecond());
}
/**
* Wrapper for logging debug messages.
*
* @param message The message to log.
*/
private void debugInfo(final String message) {
if (serverContext != null) {
serverContext.debugInfo(message);
} else {
System.out.println(message);
}
}
/**
* A Serializable Wrapper class to contain the state stored in
* {@code currentState}. This is used to retrieve the Change Detector's
* state when resuming from being shut down.
*/
static final class ChangeDetectorState implements Serializable {
private static final long serialVersionUID = -6088093761035832413L;
private final File file;
private final long position;
/**
* Private Constructor to prevent instantiation.
*
* @param file File part of state.
* @param position Change position within the file.
*/
private ChangeDetectorState(final File file, final long position) {
this.file = file;
this.position = position;
}
/**
* Creates a ChangeDetectorState from an {@code ObjectPair}.
*
* @param state The current state.
* @return The resulting ChangeDetectorState.
*/
static ChangeDetectorState createState(final ObjectPair state) {
return new ChangeDetectorState(state.getFirst(), state.getSecond());
}
/**
* Get the file portion of the state.
*
* @return a file.
*/
File getFile() {
return file;
}
/**
* Get the position portion of the state.
*
* @return the position.
*/
long getPosition() {
return position;
}
}
/**
* Utility class to manage an LDIFReader and its position in the file
* currently being processed.
*/
static final class LDIFProcessor {
private final LDIFReader ldifReader;
private final File file;
// Tracks how many changes have been
// processed in the current file.
private long currentPosition;
private LDIFChangeRecord currentRecord;
private LDIFChangeRecord nextRecord;
/**
* Constructs a new LDIFProcessor.
*
* @param file The file to process.
* @param startingPosition The desired starting position.
*
* @throws IOException If unable to open the file.
* @throws LDIFException If unable to read changes as LDIF records.
*/
LDIFProcessor(final File file, final long startingPosition)
throws IOException, LDIFException {
this.file = file;
this.ldifReader = new LDIFReader(file);
this.currentPosition = startingPosition;
// If starting later in the file discard until the desired change.
if (startingPosition > 0) {
for (int i = 0; i < currentPosition; i++) {
ldifReader.readChangeRecord();
}
}
currentRecord = ldifReader.readChangeRecord(true);
nextRecord = ldifReader.readChangeRecord(true);
}
/**
* Gets the current state of this LDIFProcessor.
*
* @return An ObjectPair containing the current file and position.
*/
ObjectPair getCurrentState() {
return new ObjectPair<>(file,
currentRecord == null ? currentPosition-- : currentPosition);
}
/**
* Close the LDIFProcessor.
*
* @throws IOException If error closing LDIFReader.
*/
void close() throws IOException {
ldifReader.close();
}
/**
* Returns true if the LDIFProcessor has more changes.
*
* @return True if more changes.
*/
boolean hasNext() {
return currentRecord != null;
}
/**
* Get next LDIFChangeRecord.
*
* @return The next change record or null.
*
* @throws IOException If error reading from file.
* @throws LDIFException If error parsing file as LDIF.
*/
LDIFChangeRecord next() throws IOException, LDIFException {
final LDIFChangeRecord toReturn = currentRecord;
currentRecord = nextRecord;
if (currentRecord != null) {
nextRecord = ldifReader.readChangeRecord(true);
}
currentPosition++;
return toReturn;
}
}
}