com.unboundid.directory.sdk.sync.scripting
Class ScriptedJDBCSyncSource

java.lang.Object
  extended by com.unboundid.directory.sdk.sync.scripting.ScriptedJDBCSyncSource
All Implemented Interfaces:
Configurable

@Extensible
@SynchronizationServerExtension(appliesToLocalContent=false,
                                appliesToSynchronizedContent=true)
public abstract class ScriptedJDBCSyncSource
extends java.lang.Object
implements Configurable

This class defines an API that must be implemented by scripted extensions in order to synchronize data out of a relational database. Since the UnboundID Synchronization Server is LDAP-centric, this API allows you to take database content and convert it into LDAP entries which can then be processed by the Synchronization Server. The lifecycle of a sync operation is as follows:

  1. Detect change at the synchronization source
  2. Fetch full source entry
  3. Perform any mappings and compute the equivalent destination entry
  4. Fetch full destination entry
  5. Diff the computed destination entry and actual (fetched) destination entry
  6. Apply the minimal set of changes at the destination to bring it in sync
This implies that the fetchEntry(TransactionContext, SyncOperation) method will be called once for every change that is returned by getNextBatchOfChanges(TransactionContext, int, AtomicLong).

During realtime synchronization (i.e. when a Sync Pipe is running), there is a sliding window of changes being processed, and this API provides a distinction between some different points along that window:

In several places a TransactionContext is provided, which allows controlled access to the target database. By default, methods in this class are always provided with a fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether the method returned normally or threw an exception. Implementers may optionally perform their own transaction management within these methods if necessary.

Several of these methods throw SQLException, which should be used in the case of any database access error. For other types of errors, runtime exceptions may be used (IllegalStateException, NullPointerException, etc.). The Synchronization Server will automatically retry operations that fail, up to a configurable amount of attempts. The exception to this rule is if a SQLException is thrown with a SQL state string beginning with "08"; this indicates a connection error, and in this case the operation is retried indefinitely.

Configuring Groovy Scripted JDBC Sync Sources

In order to configure a scripted JDBC sync source based on this API and written in the Groovy scripting language, use a command like:
      dsconfig create-sync-source \
           --source-name "{source-name}" \
           --type groovy-scripted-jdbc \
           --set "server:{server-name}" \
           --set "script-class:{class-name}" \
           --set "script-argument:{name=value}"
 
where "{source-name}" is the name to use for the JDBC sync source instance, "{server-name}" is the name of the JDBC external server that will be used as the sync source, "{class-name}" is the fully-qualified name of the Groovy class written using this API, and "{name=value}" represents name-value pairs for any arguments to provide to the JDBC sync source. If multiple arguments should be provided to the JDBC sync source, then the "--set script-argument:{name=value}" option should be provided multiple times.


Constructor Summary
ScriptedJDBCSyncSource()
           
 
Method Summary
abstract  void acknowledgeCompletedOps(TransactionContext ctx, java.util.LinkedList<SyncOperation> completedOps)
          Provides a way for the Synchronization Server to acknowledge back to the script which sync operations it has processed.
 int cleanupChangelog(TransactionContext ctx, long maxAgeMillis)
          Performs a cleanup of the changelog table (if desired).
 void defineConfigArguments(com.unboundid.util.args.ArgumentParser parser)
          Updates the provided argument parser to define any configuration arguments which may be used by this extension.
abstract  com.unboundid.ldap.sdk.Entry fetchEntry(TransactionContext ctx, SyncOperation operation)
          Return a full source entry (in LDAP form) from the database, corresponding to the DatabaseChangeRecord that is passed in through the SyncOperation.
 void finalizeJDBCSyncSource(TransactionContext ctx)
          This hook is called when a Sync Pipe shuts down, when the resync process shuts down, or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished.
abstract  java.util.List<DatabaseChangeRecord> getNextBatchOfChanges(TransactionContext ctx, int maxChanges, java.util.concurrent.atomic.AtomicLong numStillPending)
          Return the next batch of change records from the database.
abstract  java.io.Serializable getStartpoint()
          Gets the current value of the startpoint for change detection.
 void initializeJDBCSyncSource(TransactionContext ctx, SyncServerContext serverContext, JDBCSyncSourceConfig config, com.unboundid.util.args.ArgumentParser parser)
          This hook is called when a Sync Pipe first starts up, when the resync process first starts up, or when the set-startpoint subcommand is called from the realtime-sync command line tool.
 void listAllEntries(TransactionContext ctx, java.util.Iterator<java.lang.String> inputLines, java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
          Deprecated. 
 void listAllEntries(TransactionContext ctx, java.lang.String entryType, java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
          Gets a list of all the entries in the database for a given entry type.
 void listAllEntries(TransactionContext ctx, java.lang.String entryType, java.util.Iterator<java.lang.String> inputLines, java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
          Gets a list of all the entries in the database from a given file input.
abstract  void setStartpoint(TransactionContext ctx, SetStartpointOptions options)
          This method should effectively set the starting point for synchronization to the place specified by the options parameter.
 
Methods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
 

Constructor Detail

ScriptedJDBCSyncSource

public ScriptedJDBCSyncSource()
Method Detail

defineConfigArguments

public void defineConfigArguments(com.unboundid.util.args.ArgumentParser parser)
                           throws com.unboundid.util.args.ArgumentException
Updates the provided argument parser to define any configuration arguments which may be used by this extension. The argument parser may also be updated to define relationships between arguments (e.g., to specify required, exclusive, or dependent argument sets).

Specified by:
defineConfigArguments in interface Configurable
Parameters:
parser - The argument parser to be updated with the configuration arguments which may be used by this extension.
Throws:
com.unboundid.util.args.ArgumentException - If a problem is encountered while updating the provided argument parser.

initializeJDBCSyncSource

public void initializeJDBCSyncSource(TransactionContext ctx,
                                     SyncServerContext serverContext,
                                     JDBCSyncSourceConfig config,
                                     com.unboundid.util.args.ArgumentParser parser)
This hook is called when a Sync Pipe first starts up, when the resync process first starts up, or when the set-startpoint subcommand is called from the realtime-sync command line tool. Any initialization of this sync source should be performed here. This method should generally store the SyncServerContext in a class member so that it can be used elsewhere in the implementation.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

The default implementation is empty.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
serverContext - A handle to the server context for the server in which this extension is running.
config - The general configuration for this sync source.
parser - The argument parser which has been initialized from the configuration for this JDBC sync source.

finalizeJDBCSyncSource

public void finalizeJDBCSyncSource(TransactionContext ctx)
This hook is called when a Sync Pipe shuts down, when the resync process shuts down, or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished. Any clean up of this sync source should be performed here.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

The default implementation is empty.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.

setStartpoint

@ThreadSafety(level=METHOD_NOT_THREADSAFE)
public abstract void setStartpoint(TransactionContext ctx,
                                                SetStartpointOptions options)
                            throws java.sql.SQLException
This method should effectively set the starting point for synchronization to the place specified by the options parameter. This should cause all changes previous to the specified start point to be disregarded and only changes after that point to be returned by getNextBatchOfChanges(TransactionContext, int, AtomicLong).

There are several different startpoint types (see SetStartpointOptions), and this implementation is not required to support them all. If the specified startpoint type is unsupported, this method should throw an UnsupportedOperationException.

IMPORTANT: The RESUME_AT_SERIALIZABLE startpoint type must be supported by your implementation, because this is used when a Sync Pipe first starts up.

This method can be called from two different contexts:

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
options - an object which indicates where exactly to start synchronizing (e.g. the end of the changelog, specific change number, a certain time ago, etc)
Throws:
java.sql.SQLException - if there is any error while setting the start point

getStartpoint

@ThreadSafety(level=METHOD_NOT_THREADSAFE)
public abstract java.io.Serializable getStartpoint()
Gets the current value of the startpoint for change detection. This is the "bookmark" which indicates which changes have already been processed and which have not. In most cases, a change number is used to detect changes and is managed by the Synchronization Server, in which case this implementation needs only to return the latest acknowledged change number. In other cases, the return value may correspond to a different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. In any case, this method should return the value that is updated by acknowledgeCompletedOps(TransactionContext, LinkedList).

This method is called periodically and the return value is saved in the persistent state for the Sync Pipe that uses this script as its Sync Source.

IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this script (via acknowledgeCompletedOps(TransactionContext, LinkedList)). Otherwise it will be possible for changes to be missed when the Synchronization Server is restarted or a connection error occurs.

Returns:
a value to store in the persistent state for the Sync Pipe. This is usually a change number, but if a changelog table is not used to detect changes, this value should represent some other token to pass into setStartpoint(TransactionContext, SetStartpointOptions) when the sync pipe starts up.

getNextBatchOfChanges

public abstract java.util.List<DatabaseChangeRecord> getNextBatchOfChanges(TransactionContext ctx,
                                                                           int maxChanges,
                                                                           java.util.concurrent.atomic.AtomicLong numStillPending)
                                                                    throws java.sql.SQLException
Return the next batch of change records from the database. Change records are just hints that a change happened; they do not include the actual data of the change. In an effort to never synchronize stale data, the Synchronization Server will go back and fetch the full source entry for each change record.

On the first invocation, this should return changes starting from the startpoint that was set by setStartpoint(TransactionContext, SetStartpointOptions). This method is responsible for updating the internal state such that subsequent invocations do not return duplicate changes.

The resulting list should be limited by maxChanges. The numStillPending reference should be set to the estimated number of changes that haven't yet been retrieved from the changelog table when this method returns, or zero if all the current changes have been retrieved.

IMPORTANT: While this method needs to keep track of which changes have already been returned so that it does not return them again, it should NOT modify the official startpoint. The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this script (via acknowledgeCompletedOps(TransactionContext, LinkedList)). Otherwise it will be possible for changes to be missed when the Synchronization Server is restarted or a connection error occurs. The startpoint should not change as a result of this method.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

This method does not need to be thread-safe. It will be invoked repeatedly by a single thread, based on the polling interval set in the Sync Pipe configuration.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
maxChanges - the maximum number of changes to retrieve
numStillPending - this should be set to the number of unretrieved changes that are still pending after this batch has been retrieved. This will be passed in as zero, and may be left that way if the actual value cannot be determined.
Returns:
a list of DatabaseChangeRecord instances, each corresponding to a row in the changelog table (or the equivalent if some other change tracking mechanism is being used). If there are no new changes to return, this method should return an empty list.
Throws:
java.sql.SQLException - if there is any error while retrieving the next batch of changes

fetchEntry

public abstract com.unboundid.ldap.sdk.Entry fetchEntry(TransactionContext ctx,
                                                        SyncOperation operation)
                                                 throws java.sql.SQLException
Return a full source entry (in LDAP form) from the database, corresponding to the DatabaseChangeRecord that is passed in through the SyncOperation. This method should perform any queries necessary to gather the latest values for all the attributes to be synchronized.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

This method must be thread safe, as it will be called repeatedly and concurrently by each of the Sync Pipe worker threads as they process entries.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
operation - the SyncOperation which identifies the database "entry" to fetch. The DatabaseChangeRecord can be obtained by calling operation.getDatabaseChangeRecord(). This is returned by getNextBatchOfChanges(TransactionContext, int, AtomicLong) or by listAllEntries(TransactionContext, String, BlockingQueue) .
Returns:
a full LDAP Entry, or null if no such entry exists.
Throws:
java.sql.SQLException - if there is an error fetching the entry

acknowledgeCompletedOps

public abstract void acknowledgeCompletedOps(TransactionContext ctx,
                                             java.util.LinkedList<SyncOperation> completedOps)
                                      throws java.sql.SQLException
Provides a way for the Synchronization Server to acknowledge back to the script which sync operations it has processed. This method should update the official startpoint which was set by setStartpoint(TransactionContext, SetStartpointOptions) and is returned by getStartpoint().

IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this script (via this method). Otherwise it will be possible for changes to be missed when the Synchronization Server is restarted or a connection error occurs.

A TransactionContext is provided in case the acknowledgment needs to make it all the way back to the database itself (for example if you were using Oracle's Change Data Capture). The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
completedOps - a list of SyncOperations that have finished processing. The records are listed in the order they were first detected.
Throws:
java.sql.SQLException - if there is an error acknowledging the changes back to the database

cleanupChangelog

public int cleanupChangelog(TransactionContext ctx,
                            long maxAgeMillis)
                     throws java.sql.SQLException
Performs a cleanup of the changelog table (if desired). There is a background thread that periodically invokes this method. It should remove any rows in the changelog table that are more than maxAgeMillis milliseconds old.

NOTE: If the system clock on the database server is not in sync with the system clock on the Synchronization Server, this method should query the database for its current time in order to determine the cut-off point for deleting changelog records.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

If a separate mechanism will be used to manage the changelog table, this method may be implemented as a no-op and always return zero. This is how the default implementation behaves.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
maxAgeMillis - the period of time (in milliseconds) after which a changelog table record should be deleted
Returns:
the number of rows that were deleted from the changelog table
Throws:
java.sql.SQLException - if there is an error purging records from the changelog table

listAllEntries

public void listAllEntries(TransactionContext ctx,
                           java.lang.String entryType,
                           java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
                    throws java.sql.SQLException
Gets a list of all the entries in the database for a given entry type. This is used by the 'resync' command line tool. The default implementation throws a UnsupportedOperationException; subclasses should override if the resync functionality is needed.

The entryType is user-defined; it will be passed in on the command line for resync. The outputQueue should contain DatabaseChangeRecord objects with the ChangeType set to resync.

This method should not return until all the entries of the given entryType have been added to the output queue. Separate threads will concurrently drain entries from the queue and process them. The queue should not actually contain full entries, but rather DatabaseChangeRecord objects which identify the full database entries. These objects are then individually passed in to fetchEntry(TransactionContext, SyncOperation). Therefore, it is important to make sure that the DatabaseChangeRecord instances contain enough identifiable information (e.g. primary keys) for each entry so that the entry can be found again.

The lifecycle of resync is similar to that of real-time sync, with a few differences:

  1. Stream out a list of all IDs in the database (for a given entryType)
  2. Fetch full source entry for an ID
  3. Perform any mappings and compute the equivalent destination entry
  4. Fetch full destination entry
  5. Diff the computed destination entry and actual destination entry
  6. Apply the minimal set of changes at the destination to bring it in sync
If the total set of entries is very large, it is fine to split up the work into multiple database queries within this method. The queue will not grow out of control because it blocks when it becomes full. The queue capacity is fixed at 1000.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
entryType - the type of database entry to be fetched (this is specified on the CLI for the resync command)
outputQueue - a queue of DatabaseChangeRecord objects which will be individually fetched via fetchEntry(TransactionContext, SyncOperation)
Throws:
java.sql.SQLException - if there is an error retrieving the list of entries to resync

listAllEntries

@Deprecated
public void listAllEntries(TransactionContext ctx,
                                      java.util.Iterator<java.lang.String> inputLines,
                                      java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
                    throws java.sql.SQLException
Deprecated. 

Note: This method is deprecated and may be removed in a future release. All new and existing code should be changed to use the version of this method which includes the entryType parameter.

Gets a list of all the entries in the database from a given file input. This is used by the 'resync' command line tool. The default implementation throws a UnsupportedOperationException; subclasses should override if the resync functionality is needed for specific database records, which can be specified in the input file.

The format for the inputLines (e.g. the content of the file) is user-defined; it may be key/value pairs, primary keys, or full SQL statements, for example. The use of this method is triggered via the --sourceInputFile argument on the resync CLI. The outputQueue should contain DatabaseChangeRecord objects with the ChangeType set to resync.

This method should not return until all the entries specified by the input file have been added to the output queue. Separate threads will concurrently drain entries from the queue and process them. The queue should not actually contain full entries, but rather DatabaseChangeRecord objects which identify the full database entries. These objects are then individually passed in to fetchEntry(TransactionContext, SyncOperation). Therefore, it is important to make sure that the DatabaseChangeRecord instances contain enough identifiable information (e.g. primary keys) for each entry so that the entry can be found again.

The lifecycle of resync is similar to that of real-time sync, with a few differences:

  1. Stream out a list of all IDs in the database (using the given input file)
  2. Fetch full source entry for an ID
  3. Perform any mappings and compute the equivalent destination entry
  4. Fetch full destination entry
  5. Diff the computed destination entry and actual destination entry
  6. Apply the minimal set of changes at the destination to bring it in sync
If the total set of entries is very large, it is fine to split up the work into multiple database queries within this method. The queue will not grow out of control because it blocks when it becomes full. The queue capacity is fixed at 1000.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
inputLines - an Iterator containing the lines from the specified input file to resync (this is specified on the CLI for the resync command). These lines can be any format, for example a set of primary keys, a set of WHERE clauses, a set of full SQL queries, etc.
outputQueue - a queue of DatabaseChangeRecord objects which will be individually fetched via fetchEntry(TransactionContext, SyncOperation)
Throws:
java.sql.SQLException - if there is an error retrieving the list of entries to resync

listAllEntries

public void listAllEntries(TransactionContext ctx,
                           java.lang.String entryType,
                           java.util.Iterator<java.lang.String> inputLines,
                           java.util.concurrent.BlockingQueue<DatabaseChangeRecord> outputQueue)
                    throws java.sql.SQLException
Gets a list of all the entries in the database from a given file input. This is used by the 'resync' command line tool. The default implementation throws a UnsupportedOperationException; subclasses should override if the resync functionality is needed for specific database records, which can be specified in the input file.

The format for the inputLines (e.g. the content of the file) is user-defined; it may be key/value pairs, primary keys, or full SQL statements, for example. The use of this method is triggered via the --sourceInputFile argument on the resync CLI. The outputQueue should contain DatabaseChangeRecord objects with the ChangeType set to resync.

This method should not return until all the entries specified by the input file have been added to the output queue. Separate threads will concurrently drain entries from the queue and process them. The queue should not actually contain full entries, but rather DatabaseChangeRecord objects which identify the full database entries. These objects are then individually passed in to fetchEntry(TransactionContext, SyncOperation). Therefore, it is important to make sure that the DatabaseChangeRecord instances contain enough identifiable information (e.g. primary keys) for each entry so that the entry can be found again.

The lifecycle of resync is similar to that of real-time sync, with a few differences:

  1. Stream out a list of all IDs in the database (using the given input file)
  2. Fetch full source entry for an ID
  3. Perform any mappings and compute the equivalent destination entry
  4. Fetch full destination entry
  5. Diff the computed destination entry and actual destination entry
  6. Apply the minimal set of changes at the destination to bring it in sync
If the total set of entries is very large, it is fine to split up the work into multiple database queries within this method. The queue will not grow out of control because it blocks when it becomes full. The queue capacity is fixed at 1000.

A TransactionContext is provided, which allows controlled access to the target database. The context will contain a fresh fresh connection (i.e. a new transaction), and the Synchronization Server will always commit or rollback the transaction automatically, depending on whether this method returns normally or throws an exception. Implementers may optionally perform their own transaction management within this method if necessary.

Parameters:
ctx - a TransactionContext which provides a valid JDBC connection to the database.
entryType - the type of database entry to be fetched (this is specified on the CLI for the resync command)
inputLines - an Iterator containing the lines from the specified input file to resync (this is specified on the CLI for the resync command). These lines can be any format, for example a set of primary keys, a set of WHERE clauses, a set of full SQL queries, etc.
outputQueue - a queue of DatabaseChangeRecord objects which will be individually fetched via fetchEntry(TransactionContext, SyncOperation)
Throws:
java.sql.SQLException - if there is an error retrieving the list of entries to resync