/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* docs/licenses/cddl.txt
* or http://www.opensource.org/licenses/cddl1.php.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* docs/licenses/cddl.txt. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2011-2016 UnboundID Corp.
*/
package com.unboundid.directory.sdk.examples.groovy;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import au.com.bytecode.opencsv.CSVParser;
import au.com.bytecode.opencsv.CSVReader;
import com.unboundid.ldap.sdk.ChangeType;
import com.unboundid.ldap.sdk.Entry;
import com.unboundid.ldap.sdk.DN;
import com.unboundid.ldap.sdk.LDAPException;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.util.args.DNArgument;
import com.unboundid.util.args.FileArgument;
import com.unboundid.util.StaticUtils;
import com.unboundid.directory.sdk.sync.scripting.ScriptedSyncSource;
import com.unboundid.directory.sdk.sync.config.SyncSourceConfig;
import com.unboundid.directory.sdk.sync.types.EndpointException;
import com.unboundid.directory.sdk.sync.types.PostStepResult;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions.StartpointType;
import com.unboundid.directory.sdk.sync.types.ChangeRecord;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.common.types.LogSeverity;
/**
* This class provides a simple example of a generic Sync Source which will
* detect CSV files in a given directory and synchronize them to an
* LDAP destination. More specifically, the extension will look for any files
* with the ".csv" extension, and interpret each row to be a single LDAP entry
* to be synchronized. After a file has been completely processed, it will be
* moved to a separate directory.
* <p>
* The CSV header should contain valid LDAP attribute names. The 'uid' attribute
* is used as the RDN and is required to be present, and the source base DN
* is configurable using an extension argument. The DNs for the entries will be
* constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is
* expected to have values for all of the LDAP attributes, not just those that
* have changed.
* <p>
* This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when
* using this example, it's important to set <i>modifies-as-creates=true</i>
* on the Sync Class in the configuration. This will allow the Sync Pipe to
* create entries that don't already exist. For the sake of simplicity, this
* example does not handle DELETE operations.
* <p>
* This extension uses the OpenCSV library, which is available at
* {@link http://opencsv.sourceforge.net} and provides simple CSV parsing.
* <p>
* The following arguments are defined:
* <UL>
* <LI>csv-files-dir -- The filesystem directory to monitor for CSV files</LI>
* <LI>processed-files-dir -- The filesystem directory where finished CSV
* files should be moved</LI>
* <LI>base-dn -- The base DN to use when creating LDAP entries from the CSV
* content</LI>
* </UL>
*/
public final class ExampleScriptedSyncSource extends ScriptedSyncSource
{
//The server context which can be used for obtaining the server state,
//logging, etc.
private SyncServerContext serverContext;
//The openCSV reader to handle parsing CSV files.
private CSVReader csvReader;
//A set of files that have been completed detected by getNextBatchOfChanges()
//(e.g. all rows have been read and returned). This is a map so that we know
//how many records were in each file.
private final Map<File,Long> pendingFiles =
new ConcurrentHashMap<File,Long>();
//A handle to the current file which we can use to lock it while processing.
private RandomAccessFile fileHandle;
//The directory which we will monitor for new CSV files.
private File csvFilesDir;
//The directory where we will move files after they have been processed.
private File processedFilesDir;
//The base DN to use for entries in the CSV files.
private DN baseDN;
//The set of attribute names for the current CSV file. (This represents the
//CSV header).
private String[] attributeNames;
//The file filter used to match CSV files in the csvDirectory.
private FileFilter csvFilter;
//A Comparator to sort the files from oldest to newest
private Comparator<File> oldestToNewestComparator;
//The attachment key used to place the CSV filename on the ChangeRecord
//so that acknowledgeCompletedOps() can reference it.
private static final String FILE_ATTACHMENT_KEY = "File.Attachment";
//The attachment key used to place the row number of a record in the CSV on
//the ChangeRecord when it is built.
private static final String CSV_ROW_NUMBER_KEY = "CSV.RowNumber";
/**
* Retrieves a human-readable name for this extension.
*
* @return A human-readable name for this extension.
*/
@Override
public String getExtensionName()
{
return "Example Sync Source";
}
/**
* Retrieves a human-readable description for this extension. Each element
* of the array that is returned will be considered a separate paragraph in
* generated documentation.
*
* @return A human-readable description for this extension, or {@code null}
* or an empty array if no description should be available.
*/
@Override
public String[] getExtensionDescription()
{
return [
"This extension implements a Sync Source which can be used " +
"to detect and synchronize changes from CSV files in a given " +
"directory. The extension will look for any files with the .csv " +
"extension and interpret each row to be a single LDAP entry to " +
"be synchronized. After a file has been completely processed, " +
"it will be moved to a separate directory.",
"For the sake of simplicity, the columns in the files " +
"should be LDAP attributes (cn, sn, givenname, etc). The " +
"first line (CSV header) should contain the attribute names " +
"and the first column is always required to be the 'uid' " +
"attribute. The DNs for the entries will be constructed as " +
"follows: uid={uid},{baseDN}. Each row in a CSV file is " +
"expected to have values for all of the LDAP attributes, not " +
"just those that have changed."
];
}
/**
* Updates the provided argument parser to define any configuration arguments
* which may be used by this extension. The argument parser may also be
* updated to define relationships between arguments (e.g. to specify
* required, exclusive, or dependent argument sets).
*
* @param parser The argument parser to be updated with the configuration
* arguments which may be used by this extension.
*
* @throws ArgumentException If a problem is encountered while updating the
* provided argument parser.
*/
@Override
public void defineConfigArguments(final ArgumentParser parser)
throws ArgumentException
{
FileArgument newFilesArg = new FileArgument(
'd' as char, "csv-files-dir", true, 1, "{path}",
"The file system directory to monitor for CSV files",
true, true, false, true);
parser.addArgument(newFilesArg);
FileArgument processedFilesArg = new FileArgument(
'p' as char, "processed-files-dir", true, 1, "{path}",
"The file system directory where the extension should " +
"move CSV files after it finishes processing them",
true, true, false, true);
parser.addArgument(processedFilesArg);
DN defaultDN = null;
try
{
defaultDN = new DN("dc=example,dc=com");
}
catch(LDAPException e)
{
throw new ArgumentException("Could not construct a default DN", e);
}
DNArgument baseDNArg = new DNArgument('b' as char, "base-dn", true, 1,
"{base-dn}", "The base DN to use for entries in the CSV files",
defaultDN);
parser.addArgument(baseDNArg);
}
/**
* This hook is called when a Sync Pipe first starts up, when the
* <i>resync</i> process first starts up, or when the set-startpoint
* subcommand is called from the <i>realtime-sync</i> command line tool.
* Any initialization of this sync source should be performed here. This
* method should generally store the {@link SyncServerContext} in a class
* member so that it can be used elsewhere in the implementation.
*
* @param serverContext A handle to the server context for the server in
* which this extension is running.
* @param config The general configuration for this sync source.
* @param parser The argument parser which has been initialized from
* the configuration for this sync source.
*/
@Override
public void initializeSyncSource(final SyncServerContext serverContext,
final SyncSourceConfig config,
final ArgumentParser parser)
{
this.serverContext = serverContext;
//Implement FileFilter the Groovy way
csvFilter = { it.isFile() && it.canRead() &&
it.getName().endsWith(".csv")} as FileFilter;
//Implement Comparator the Groovy way
oldestToNewestComparator = [compare:
{f1,f2 -> f1.lastModified() - f2.lastModified() > 0 ? 1 :
f1.lastModified() - f2.lastModified() < 0 ? -1 : 0}] as Comparator;
FileArgument newFilesArg =
(FileArgument) parser.getNamedArgument("csv-files-dir");
csvFilesDir = newFilesArg.getValue();
FileArgument processedFilesArg =
(FileArgument) parser.getNamedArgument("processed-files-dir");
processedFilesDir = processedFilesArg.getValue();
DNArgument baseDNArg = (DNArgument) parser.getNamedArgument("base-dn");
baseDN = baseDNArg.getValue();
}
/**
* This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
* process shuts down, or when the set-startpoint subcommand (from the
* <i>realtime-sync</i> command line tool) is finished. Any clean up of this
* sync source should be performed here.
*/
@Override
public void finalizeSyncSource()
{
if(csvReader != null)
{
try
{
csvReader.close();
}
catch(IOException e)
{
serverContext.debugThrown(e);
}
}
}
/**
* This method should effectively set the starting point for synchronization
* to the place specified by the <code>options</code> parameter. This should
* cause all changes previous to the specified start point to be disregarded
* and only changes after that point to be returned by
* {@link #getNextBatchOfChanges(int, AtomicLong)}.
* <p>
* There are several different startpoint types (see
* {@link SetStartpointOptions}), and this implementation is not required to
* support them all. If the specified startpoint type is unsupported, this
* method should throw an {@link UnsupportedOperationException}.
* <p>
* <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
* must be supported by your implementation, because this is used when a Sync
* Pipe first starts up. The {@link Serializable} in this case is the same
* type that is returned by {@link #getStartpoint()}; the Data Sync Server
* persists it and passes it back in on on a restart.
* <p>
* This method can be called from two different contexts:
* <ul>
* <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
* (the Sync Pipe is required to be stopped in this context)</li>
* <li>Immediately after a Sync Pipe starts up and a connection is first
* established to the source server (e.g. before the first call to
* {@link #getNextBatchOfChanges(int, AtomicLong)})</li>
* </ul>
*
* @param options
* an object which indicates where exactly to start synchronizing
* (e.g. the end of the changelog, specific change number, a certain
* time ago, etc)
* @throws EndpointException
* if there is any error while setting the start point
*/
@Override
@SuppressWarnings("unchecked")
public void setStartpoint(final SetStartpointOptions options)
throws EndpointException
{
File[] files;
switch(options.getStartpointType())
{
case StartpointType.BEGINNING_OF_CHANGELOG:
//Nothing to do here, getNextBatchOfChanges() will automatically start
//at the beginning (e.g. it will pick up at the first file in the
//csvDirectory).
break;
case StartpointType.END_OF_CHANGELOG:
//Move all files to the processed files directory
files = csvFilesDir.listFiles(csvFilter);
if (files == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open CSV file directory " + csvFilesDir.getAbsolutePath())
}
for(File f : files)
{
moveCompletedFile(f);
}
break;
case StartpointType.RESUME_AT_CHANGE_TIME:
long time = options.getChangeTime().getTime();
files = csvFilesDir.listFiles(csvFilter);
if (files == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open CSV file directory " + csvFilesDir.getAbsolutePath())
}
Arrays.sort(files, oldestToNewestComparator);
for(File f : files)
{
if(f.lastModified() < time)
{
moveCompletedFile(f);
}
else
{
break;
}
}
break;
case StartpointType.RESUME_AT_SERIALIZABLE:
//When sync first starts up, this method is called with
//RESUME_AT_SERIALIZABLE to initialize the internal state.
//Since this extension does not save any state, we do not have to
//set anything here.
break;
default:
throw new IllegalArgumentException(
"This startpoint type is not supported: " +
options.getStartpointType().toString());
}
}
/**
* Gets the current value of the startpoint for change detection. This is the
* "bookmark" which indicates which changes have already been processed and
* which have not. In most cases, a change number is used to detect changes
* and is managed by the Data Sync Server, in which case this
* implementation needs only to return the latest acknowledged
* change number. In other cases, the return value may correspond to a
* different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
* In any case, this method should return the value that is updated by
* {@link #acknowledgeCompletedOps(LinkedList)}.
* <p>
* This method is called periodically and the return value is saved in the
* persistent state for the Sync Pipe that uses this extension as its Sync
* Source.
* <p>
* <b>IMPORTANT</b>: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this script (via
* {@link #acknowledgeCompletedOps(LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs.
* @return a value to store in the persistent state for the Sync Pipe. This is
* usually a change number, but if a changelog table is not used to
* detect changes, this value should represent some other token to
* pass into {@link #setStartpoint(SetStartpointOptions)}
* when the sync pipe starts up.
*/
@Override
public Serializable getStartpoint()
{
//We return null here because we are synchronizing from a directory of files
//where the files are removed once they have been synchronized. In this
//case, we do not need to keep track of where to start; we always start at
//the oldest file in the directory and work forward.
return null;
}
/**
* Return the URL or path identifying the source endpoint
* from which this extension is transmitting data. This is used for logging
* purposes only, so it could just be a server name or hostname and port, etc.
*
* @return the path to the source endpoint
*/
@Override
public String getCurrentEndpointURL()
{
return csvFilesDir.getAbsolutePath();
}
/**
* Return a full source entry (in LDAP form) from the source, corresponding
* to the {@link ChangeRecord} that is passed in through the
* {@link SyncOperation}. This method should perform any queries necessary to
* gather the latest values for all the attributes to be synchronized.
* <p>
* This method <b>must be thread safe</b>, as it will be called repeatedly and
* concurrently by each of the Sync Pipe worker threads as they process
* entries.
* <p>
* If the original ChangeRecord has the full entry already set on it (which
* can be done using the {@link ChangeRecord.Builder#fullEntry(Entry)}),
* then this method will not get called, and the Data Sync Server will
* automatically use the full entry from the ChangeRecord. In this case, the
* implementation can always return {@code null}.
*
* @param operation
* the SyncOperation which identifies the source "entry" to
* fetch. The ChangeRecord can be obtained by calling
* <code>operation.getChangeRecord()</code>.
* These ChangeRecords are generated by
* {@link #getNextBatchOfChanges(int, AtomicLong)}
* or by
* {@link #listAllEntries(BlockingQueue)}.
*
* @return a full LDAP Entry, or null if no such entry exists.
* @throws EndpointException
* if there is an error fetching the entry
*/
@Override
public Entry fetchEntry(final SyncOperation operation)
throws EndpointException
{
ChangeRecord record = operation.getChangeRecord();
throw new IllegalStateException(
"fetchEntry() should not be called because the full entry is set " +
"on the ChangeRecord: " + record.toString());
}
/**
* Provides a way for the Data Sync Server to acknowledge back to the
* script which sync operations it has processed. This method should update
* the official startpoint which was set by
* {@link #setStartpoint(SetStartpointOptions)} and is
* returned by {@link #getStartpoint()}.
* <p>
* <b>IMPORTANT</b>: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this extension (via
* this method). Otherwise it will be possible for changes to be missed when
* the Data Sync Server is restarted or a connection error occurs.
*
* @param completedOps
* a list of {@link SyncOperation}s that have finished processing.
* The records are listed in the order they were first detected.
* @throws EndpointException
* if there is an error acknowledging the changes back to the
* database
*/
@Override
public void acknowledgeCompletedOps(
final LinkedList<SyncOperation> completedOps)
throws EndpointException
{
ChangeRecord record = completedOps.getLast().getChangeRecord();
File lastFile = (File) record.getProperty(FILE_ATTACHMENT_KEY);
Long rowNumber = (Long) record.getProperty(CSV_ROW_NUMBER_KEY);
//Move completed files to the completed files directory
File[] files = csvFilesDir.listFiles(csvFilter);
if (files == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open CSV file directory " + csvFilesDir.getAbsolutePath())
}
for(File f : files)
{
//Because changes are acknowledged in order and we detect them from oldest
//to newest, we know if the file is older than the lastFile from above,
//then we have finished processing it.
if(f.lastModified() < lastFile.lastModified())
{
moveCompletedFile(f);
pendingFiles.remove(f);
}
}
//If the last acknowledged file is completely finished, move it to the
//processed files directory as well.
if(pendingFiles.get(lastFile) == rowNumber)
{
moveCompletedFile(lastFile);
pendingFiles.remove(lastFile);
}
}
/**
* Return the next batch of change records from the source. Change records
* are usually just hints that a change happened; they do not include
* the full contents of the target entry. In an effort to never synchronize
* stale data, the Data Sync Server will go back and fetch the full
* target entry for each change record.
* <p>
* On the first invocation, this should return changes starting from the
* startpoint that was set by
* {@link #setStartpoint(SetStartpointOptions)}. This method is also
* responsible for updating the internal state such that subsequent
* invocations do not return duplicate changes.
* <p>
* The resulting list should be limited by <code>maxChanges</code>. The
* <code>numStillPending</code> reference should be set to the estimated
* number of changes that haven't yet been retrieved from the source endpoint
* when this method returns, or zero if all the current changes have been
* retrieved.
* <p>
* <b>IMPORTANT</b>: While this method needs to keep track of which changes
* have already been returned so that it does not return them again, it should
* <b>NOT</b> modify the official startpoint. The internal value for the
* startpoint should only be updated after a sync operation is acknowledged
* back to this script (via
* {@link #acknowledgeCompletedOps(LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs. The
* startpoint should not change as a result of this method.
* <p>
* This method <b>does not need to be thread-safe</b>. It will be invoked
* repeatedly by a single thread, based on the polling interval set in the
* Sync Pipe configuration.
*
* @param maxChanges
* the maximum number of changes to retrieve
* @param numStillPending
* this should be set to the number of unretrieved changes that
* are still pending after this batch has been retrieved. This will
* be passed in
* as zero, and may be left that way if the actual value cannot be
* determined.
* @return a list of {@link ChangeRecord} instances, each
* corresponding to a single change at the source endpoint.
* If there are no new changes to return, this method should return
* an empty list.
* @throws EndpointException
* if there is any error while retrieving the next batch of changes
*/
@Override
public List<ChangeRecord> getNextBatchOfChanges(
final int maxChanges,
final AtomicLong numStillPending)
throws EndpointException
{
List<ChangeRecord> results = new ArrayList<ChangeRecord>();
File[] files = csvFilesDir.listFiles(csvFilter);
if (files == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open CSV file directory " + csvFilesDir.getAbsolutePath())
}
//Sort the files from oldest to newest
Arrays.sort(files, oldestToNewestComparator);
long currentRowNum = 0;
for(File f : files)
{
if(pendingFiles.containsKey(f) || !f.exists())
{
//This file has already been completely detected.
continue;
}
try
{
if(csvReader == null)
{
fileHandle = new RandomAccessFile(f, "rw");
FileChannel channel = fileHandle.getChannel();
//Try to acquire an exclusive lock on the file while we process it
FileLock lock = channel.tryLock();
if(lock == null)
{
//Another process is using the file
continue;
}
//We use the openCSV library here to handle CSV parsing
csvReader = new CSVReader(new FileReader(f));
//The header line contains the attribute names
attributeNames = csvReader.readNext();
if(attributeNames == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Cannot read CSV header from " + f.getAbsolutePath());
}
//Check if the first attribute is 'uid'
if(!"uid".equalsIgnoreCase(attributeNames[0]))
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"First attribute is not 'uid' in file " + f.getAbsolutePath());
}
}
currentRowNum = 0;
String[] attributeValues;
while ((attributeValues = csvReader.readNext()) != null) {
currentRowNum++;
//Create the entry
Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);
//Add the attributes
for(int i = 0; i < attributeValues.length; i++)
{
e.setAttribute(attributeNames[i], attributeValues[i]);
}
//Construct a ChangeRecord
ChangeRecord.Builder bldr = new ChangeRecord.Builder(
ChangeType.MODIFY, e.getParsedDN());
bldr.changedAttributes(attributeNames);
bldr.changeTime(System.currentTimeMillis());
//Add the full entry to the ChangeRecord. This will cause the
//Sync engine to skip the call to fetchEntry() and instead use this
//entry.
bldr.fullEntry(e);
//Add the file as an attachment so that we can reference it when this
//operation makes it back to acknowledgeCompletedOps()
bldr.addProperty(FILE_ATTACHMENT_KEY, f);
//Add the row number as an attachment so that we can update the
//starting point when this operation makes it back to
//acknowledgeCompletedOps()
bldr.addProperty(CSV_ROW_NUMBER_KEY, currentRowNum);
//Build the ChangeRecord and add it to the result list
results.add(bldr.build());
if(results.size() >= maxChanges)
{
//Don't look at any more rows in this file if we've reached the
//maximum number of changes for this batch.
break;
}
}
if(attributeValues == null)
{
//We've reach the end of this file. Bookmark the file and how many
//rows were in it so that we can properly move it to the processed
//files directory once it is acknowledged.
csvReader.close();
csvReader = null;
pendingFiles.put(f, currentRowNum);
}
if(results.size() >= maxChanges)
{
//Don't look at any more files if we've reached the maximum number of
//changes for this batch.
break;
}
}
catch(Exception e)
{
serverContext.logMessage(LogSeverity.SEVERE_ERROR,
StaticUtils.getExceptionMessage(e));
}
finally
{
if(fileHandle != null && csvReader == null)
{
//This releases the exclusive lock we had on the file
try
{
fileHandle.close();
}
catch(IOException e)
{
serverContext.logMessage(LogSeverity.SEVERE_ERROR,
StaticUtils.getExceptionMessage(e));
}
}
}
}
return results;
}
/**
* Gets a list of all the entries in the source endpoint. This is used by the
* 'resync' command line tool. The default implementation throws a
* {@link UnsupportedOperationException}; subclasses should override if the
* resync functionality is needed.
* <p>
* The <code>outputQueue</code> should contain {@link ChangeRecord} objects
* with the <code>ChangeType</code> set to <code>null</code> to indicate that
* these are resync operations.
* <p>
* This method should not return until all the entries at the source
* have been added to the output queue. Separate threads will concurrently
* drain entries from the queue and process them. The queue typically should
* not contain full entries, but rather ChangeRecord objects which identify
* the full source entries. These objects are then individually passed in to
* {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
* that the ChangeRecord instances contain enough identifiable information
* (e.g. primary keys) for each entry so that the entry can be found again.
* <p>
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
* <ol>
* <li>Stream out a list of identifiers for the entries in the source
* endpoint, using a ChangeRecord as the identifier</li>
* <li>Fetch full source entry for a ChangeRecord</li>
* <li>Perform any mappings and compute the equivalent destination entry</li>
* <li>Fetch full destination entry</li>
* <li>Diff the computed destination entry and actual destination entry</li>
* <li>Apply the minimal set of changes at the destination to bring it in sync
* </li>
* </ol>
* <p>
* Alternatively, the full entry can be set on the ChangeRecord within this
* method, which will cause the "fetch full entry" step to be skipped. In this
* case the Data Sync Server will just use the entry specified on the
* ChangeRecord.
* <p>
* If the total set of entries is very large, it is fine to split up the work
* into multiple network queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* <p>
* @param outputQueue
* a queue of ChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(SyncOperation)}
* @throws EndpointException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(final BlockingQueue<ChangeRecord> outputQueue)
throws EndpointException
{
serverContext.debugInfo("Beginning to dump all entries...");
File[] files = csvFilesDir.listFiles(csvFilter);
if (files == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Unable to open CSV file directory " + csvFilesDir.getAbsolutePath())
}
for(File f : files)
{
try
{
//We use the openCSV library here to handle CSV parsing
CSVReader reader = new CSVReader(new FileReader(f));
//The header line contains the attribute names
String[] attrNames = reader.readNext();
if(attrNames == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Cannot read header line from " + f.getAbsolutePath());
}
//Check if the first attribute is 'uid'
if(!"uid".equalsIgnoreCase(attrNames[0]))
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"First attribute is not 'uid' in file " + f.getAbsolutePath());
}
String[] attributeValues;
while ((attributeValues = reader.readNext()) != null) {
//Create the entry
Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);
//Add the attributes
for(int i = 0; i < attributeValues.length; i++)
{
e.setAttribute(attrNames[i], attributeValues[i]);
}
//Construct a ChangeRecord with a null ChangeType to indicate resync
ChangeRecord.Builder bldr =
new ChangeRecord.Builder(null, e.getParsedDN());
bldr.changedAttributes(attrNames);
bldr.changeTime(f.lastModified());
//Add the full entry to the ChangeRecord. This will cause the
//Sync engine to skip the call to fetchEntry() and instead use this
//entry.
bldr.fullEntry(e);
//Note: there is no "acknowledge" phase for resync, so we do not need
//to worry about adding the file attachment here like we do in
//getNextBatchOfChanges().
outputQueue.add(bldr.build());
}
}
catch(Exception e)
{
serverContext.logMessage(LogSeverity.SEVERE_ERROR,
StaticUtils.getExceptionMessage(e));
}
}
}
/**
* Gets a list of all the entries in the source from a given file input.
* This is used by the 'resync' command line tool. The default implementation
* throws a {@link UnsupportedOperationException}; subclasses should override
* if the resync functionality is needed for specific records, which
* can be specified in the input file.
* <p>
* The format for the <code>inputLines</code> (e.g. the content of the file)
* is user-defined; it may be key/value pairs, primary keys, or full SQL
* statements, for example. The use of this method is triggered via the
* <i>--sourceInputFile</i> argument on the resync CLI. The
* <code>outputQueue</code> should contain {@link ChangeRecord}
* objects with the <code>ChangeType</code> set to <code>null</code> to
* indicate that these are resync operations.
* <p>
* This method should not return until all the entries specified by the input
* file have been added to the output queue. Separate threads will
* concurrently drain entries from the queue and process them. The queue
* typically should not contain full entries, but rather ChangeRecord
* objects which identify the full source entries. These objects are then
* individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
* it is important to make sure that the ChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
* <p>
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
* <ol>
* <li>Stream out a list of identifiers for entries in the source endpoint,
* using the given input file as the basis for which entries to resync
* </li>
* <li>Fetch full source entry for an identifier</li>
* <li>Perform any mappings and compute the equivalent destination entry</li>
* <li>Fetch full destination entry</li>
* <li>Diff the computed destination entry and actual destination entry</li>
* <li>Apply the minimal set of changes at the destination to bring it in sync
* </li>
* </ol>
* <p>
* Alternatively, the full entry can be set on the ChangeRecord within this
* method, which will cause the "fetch full entry" step to be skipped. In this
* case the Data Sync Server will just use the entry specified on the
* ChangeRecord.
* <p>
* If the total set of entries is very large, it is fine to split up the work
* into multiple network queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* <p>
* @param inputLines
* an Iterator containing the lines from the specified input file to
* resync (this is specified on the CLI for the resync command).
* These lines can be any format, for example a set of primary keys,
* a set of WHERE clauses, a set of full SQL queries, etc.
* @param outputQueue
* a queue of ChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(SyncOperation)}
* @throws EndpointException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(final Iterator<String> inputLines,
final BlockingQueue<ChangeRecord> outputQueue)
throws EndpointException
{
serverContext.debugInfo("Beginning to dump entries from file...");
try
{
//We use the openCSV library here to handle CSV parsing
CSVParser parser = new CSVParser();
String headerLine = inputLines.next();
String[] attrNames = parser.parseLine(headerLine);
//The header line contains the attribute names
if(attrNames == null)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Cannot read header line from input file");
}
//Check if the first attribute is 'uid'
if(!"uid".equalsIgnoreCase(attrNames[0]))
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"First attribute is not 'uid' in the input file");
}
while(inputLines.hasNext())
{
String[] attributeValues;
while ((attributeValues = parser.parseLine(inputLines.next())) != null)
{
//Create the entry
Entry e = new Entry("uid=" + attributeValues[0] + "," + baseDN);
//Add the attributes
for(int i = 0; i < attributeValues.length; i++)
{
e.setAttribute(attrNames[i], attributeValues[i]);
}
//Construct a ChangeRecord with a null ChangeType to indicate resync
ChangeRecord.Builder bldr =
new ChangeRecord.Builder(null, e.getParsedDN());
bldr.changedAttributes(attrNames);
//Add the full entry to the ChangeRecord. This will cause the
//Sync engine to skip the call to fetchEntry() and instead use this
//entry.
bldr.fullEntry(e);
//Note: there is no "acknowledge" phase for resync, so we do not need
//to worry about adding the file attachment here like we do in
//getNextBatchOfChanges().
//Add the ChangeRecord to the output queue
outputQueue.add(bldr.build());
}
}
}
catch(Exception e)
{
serverContext.logMessage(LogSeverity.SEVERE_ERROR,
StaticUtils.getExceptionMessage(e));
}
}
/**
* Moves a file that we have finished processing to the processed files
* directory and deletes the original file.
*
* @param srcFile the file to move
* @throws EndpointException if there is a problem moving the file
*/
private void moveCompletedFile(final File srcFile) throws EndpointException
{
final File destFile = new File(processedFilesDir, srcFile.getName());
try
{
if(!srcFile.renameTo(destFile))
{
throw new Exception("File.renameTo() returned false");
}
}
catch(Exception e)
{
throw new EndpointException(PostStepResult.ABORT_OPERATION,
"Could not move " + srcFile.getAbsolutePath() +
" to " + destFile.getAbsolutePath(), e);
}
}
}
|