/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2011-2021 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples.groovy; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import au.com.bytecode.opencsv.CSVParser; import au.com.bytecode.opencsv.CSVReader; import com.unboundid.ldap.sdk.ChangeType; import com.unboundid.ldap.sdk.Entry; import com.unboundid.ldap.sdk.DN; import com.unboundid.ldap.sdk.LDAPException; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.args.DNArgument; import com.unboundid.util.args.FileArgument; import com.unboundid.util.StaticUtils; import com.unboundid.directory.sdk.sync.scripting.ScriptedSyncSource; import com.unboundid.directory.sdk.sync.config.SyncSourceConfig; import com.unboundid.directory.sdk.sync.types.EndpointException; import com.unboundid.directory.sdk.sync.types.PostStepResult; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions.StartpointType; import com.unboundid.directory.sdk.sync.types.ChangeRecord; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.common.types.LogSeverity; /** * This class provides a simple example of a generic Sync Source which will * detect CSV files in a given directory and synchronize them to an * LDAP destination. More specifically, the extension will look for any files * with the ".csv" extension, and interpret each row to be a single LDAP entry * to be synchronized. After a file has been completely processed, it will be * moved to a separate directory. *
* The CSV header should contain valid LDAP attribute names. The 'uid' attribute * is used as the RDN and is required to be present, and the source base DN * is configurable using an extension argument. The DNs for the entries will be * constructed as follows: uid={uid},{baseDN}. Each row in a CSV file is * expected to have values for all of the LDAP attributes, not just those that * have changed. *
* This extension creates ChangeRecords using {@link ChangeType#MODIFY}, so when * using this example, it's important to set modifies-as-creates=true * on the Sync Class in the configuration. This will allow the Sync Pipe to * create entries that don't already exist. For the sake of simplicity, this * example does not handle DELETE operations. *
* This extension uses the OpenCSV library, which is available at * {@link http://opencsv.sourceforge.net} and provides simple CSV parsing. *
* The following arguments are defined: *
options
parameter. This should
* cause all changes previous to the specified start point to be disregarded
* and only changes after that point to be returned by
* {@link #getNextBatchOfChanges(int, AtomicLong)}.
* * There are several different startpoint types (see * {@link SetStartpointOptions}), and this implementation is not required to * support them all. If the specified startpoint type is unsupported, this * method should throw an {@link UnsupportedOperationException}. *
* IMPORTANT: The RESUME_AT_SERIALIZABLE
startpoint type
* must be supported by your implementation, because this is used when a Sync
* Pipe first starts up. The {@link Serializable} in this case is the same
* type that is returned by {@link #getStartpoint()}; the
* Data Sync Server persists it and passes it back in on a restart.
*
* This method can be called from two different contexts: *
* This method is called periodically and the return value is saved in the * persistent state for the Sync Pipe that uses this extension as its Sync * Source. *
* IMPORTANT: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this script (via * {@link #acknowledgeCompletedOps(LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. * @return a value to store in the persistent state for the Sync Pipe. This is * usually a change number, but if a changelog table is not used to * detect changes, this value should represent some other token to * pass into {@link #setStartpoint(SetStartpointOptions)} * when the sync pipe starts up. */ @Override public Serializable getStartpoint() { //We return null here because we are synchronizing from a directory of files //where the files are removed once they have been synchronized. In this //case, we do not need to keep track of where to start; we always start at //the oldest file in the directory and work forward. return null; } /** * Return the URL or path identifying the source endpoint * from which this extension is transmitting data. This is used for logging * purposes only, so it could just be a server name or hostname and port, etc. * * @return the path to the source endpoint */ @Override public String getCurrentEndpointURL() { return csvFilesDir.getAbsolutePath(); } /** * Return a full source entry (in LDAP form) from the source, corresponding * to the {@link ChangeRecord} that is passed in through the * {@link SyncOperation}. This method should perform any queries necessary to * gather the latest values for all the attributes to be synchronized. *
* This method must be thread safe, as it will be called repeatedly and * concurrently by each of the Sync Pipe worker threads as they process * entries. *
* If the original ChangeRecord has the full entry already set on it (which
* can be done using the {@link ChangeRecord.Builder#fullEntry(Entry)}),
* then this method will not get called, and the Data Sync Server will
* automatically use the full entry from the ChangeRecord. In this case, the
* implementation can always return {@code null}.
*
* @param operation
* the SyncOperation which identifies the source "entry" to
* fetch. The ChangeRecord can be obtained by calling
* operation.getChangeRecord()
.
* These ChangeRecords are generated by
* {@link #getNextBatchOfChanges(int, AtomicLong)}
* or by
* {@link #listAllEntries(BlockingQueue)}.
*
* @return a full LDAP Entry, or null if no such entry exists.
* @throws EndpointException
* if there is an error fetching the entry
*/
@Override
public Entry fetchEntry(final SyncOperation operation)
throws EndpointException
{
ChangeRecord record = operation.getChangeRecord();
throw new IllegalStateException(
"fetchEntry() should not be called because the full entry is set " +
"on the ChangeRecord: " + record.toString());
}
/**
* Provides a way for the Data Sync Server to acknowledge back to the
* script which sync operations it has processed. This method should update
* the official startpoint which was set by
* {@link #setStartpoint(SetStartpointOptions)} and is
* returned by {@link #getStartpoint()}.
*
* IMPORTANT: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this extension (via
* this method). Otherwise it will be possible for changes to be missed when
* the Data Sync Server is restarted or a connection error occurs.
*
* @param completedOps
* a list of {@link SyncOperation}s that have finished processing.
* The records are listed in the order they were first detected.
* @throws EndpointException
* if there is an error acknowledging the changes back to the
* database
*/
@Override
public void acknowledgeCompletedOps(
final LinkedList
* On the first invocation, this should return changes starting from the
* startpoint that was set by
* {@link #setStartpoint(SetStartpointOptions)}. This method is also
* responsible for updating the internal state such that subsequent
* invocations do not return duplicate changes.
*
* The resulting list should be limited by
* IMPORTANT: While this method needs to keep track of which changes
* have already been returned so that it does not return them again, it should
* NOT modify the official startpoint. The internal value for the
* startpoint should only be updated after a sync operation is acknowledged
* back to this script (via
* {@link #acknowledgeCompletedOps(LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs. The
* startpoint should not change as a result of this method.
*
* This method does not need to be thread-safe. It will be invoked
* repeatedly by a single thread, based on the polling interval set in the
* Sync Pipe configuration.
*
* @param maxChanges
* the maximum number of changes to retrieve
* @param numStillPending
* this should be set to the number of unretrieved changes that
* are still pending after this batch has been retrieved. This will
* be passed in
* as zero, and may be left that way if the actual value cannot be
* determined.
* @return a list of {@link ChangeRecord} instances, each
* corresponding to a single change at the source endpoint.
* If there are no new changes to return, this method should return
* an empty list.
* @throws EndpointException
* if there is any error while retrieving the next batch of changes
*/
@Override
public List
* The
* This method should not return until all the entries at the source
* have been added to the output queue. Separate threads will concurrently
* drain entries from the queue and process them. The queue typically should
* not contain full entries, but rather ChangeRecord objects which identify
* the full source entries. These objects are then individually passed in to
* {@link #fetchEntry(SyncOperation)}. Therefore, it is important to make sure
* that the ChangeRecord instances contain enough identifiable information
* (e.g. primary keys) for each entry so that the entry can be found again.
*
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
*
* Alternatively, the full entry can be set on the ChangeRecord within this
* method, which will cause the "fetch full entry" step to be skipped. In this
* case the Data Sync Server will just use the entry specified on the
* ChangeRecord.
*
* If the total set of entries is very large, it is fine to split up the work
* into multiple network queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
*
* @param outputQueue
* a queue of ChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(SyncOperation)}
* @throws EndpointException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(final BlockingQueue
* The format for the
* This method should not return until all the entries specified by the input
* file have been added to the output queue. Separate threads will
* concurrently drain entries from the queue and process them. The queue
* typically should not contain full entries, but rather ChangeRecord
* objects which identify the full source entries. These objects are then
* individually passed in to {@link #fetchEntry(SyncOperation)}. Therefore,
* it is important to make sure that the ChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
*
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
*
* Alternatively, the full entry can be set on the ChangeRecord within this
* method, which will cause the "fetch full entry" step to be skipped. In this
* case the Data Sync Server will just use the entry specified on the
* ChangeRecord.
*
* If the total set of entries is very large, it is fine to split up the work
* into multiple network queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
*
* @param inputLines
* an Iterator containing the lines from the specified input file to
* resync (this is specified on the CLI for the resync command).
* These lines can be any format, for example a set of primary keys,
* a set of WHERE clauses, a set of full SQL queries, etc.
* @param outputQueue
* a queue of ChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(SyncOperation)}
* @throws EndpointException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(final IteratormaxChanges
. The
* numStillPending
reference should be set to the estimated
* number of changes that haven't yet been retrieved from the source endpoint
* when this method returns, or zero if all the current changes have been
* retrieved.
* outputQueue
should contain {@link ChangeRecord} objects
* with the ChangeType
set to null
to indicate that
* these are resync operations.
*
*
* inputLines
(e.g. the content of the file)
* is user-defined; it may be key/value pairs, primary keys, or full SQL
* statements, for example. The use of this method is triggered via the
* --sourceInputFile argument on the resync CLI. The
* outputQueue
should contain {@link ChangeRecord}
* objects with the ChangeType
set to null
to
* indicate that these are resync operations.
*
*
*