/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2010-2018 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.io.Serializable; import com.unboundid.ldap.sdk.Attribute; import com.unboundid.ldap.sdk.Entry; import com.unboundid.ldap.sdk.DN; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.StaticUtils; import com.unboundid.directory.sdk.sync.api.JDBCSyncSource; import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord.ChangeType; import com.unboundid.directory.sdk.sync.types.TransactionContext; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.sync.util.ScriptUtils; import com.unboundid.directory.sdk.common.api.MonitorProvider; import com.unboundid.directory.sdk.common.types.LogSeverity; import com.unboundid.directory.sdk.common.types.RegisteredMonitorProvider; /** * This class provides a simple example of a JDBC Sync Source which will detect * changes from a single database table and return them to the Synchronization * Server so that they can be synchronized to a destination endpoint. * This implementation does not use any configuration arguments. *
* This example also demonstrates how to register an ad-hoc monitor provider
* with the server, without having to add any server-side configuration.
*/
public final class ExampleJDBCSyncSource
extends JDBCSyncSource
{
//The server context which can be used for obtaining the server state,
//logging, etc.
private SyncServerContext serverContext;
//The name of the data table.
private static final String DATA_TABLE = "DataTable";
//The name of the changelog table.
private static final String CHANGELOG_TABLE = "ChangeLog";
//Handle to the monitor provider registered with the server.
private RegisteredMonitorProvider monitorProvider;
//Used to keep track of which changes have been retrieved.
private volatile long nextChangeNumberToRetrieve;
//Used to keep track of which changes have finished processing.
//NOTE: this is the official "startpoint" for this implementation.
private volatile long lastCompletedChangeNumber;
/**
* Retrieves a human-readable name for this extension.
*
* @return A human-readable name for this extension.
*/
@Override
public String getExtensionName()
{
return "Example JDBC Sync Source";
}
/**
* Retrieves a human-readable description for this extension. Each element
* of the array that is returned will be considered a separate paragraph in
* generated documentation.
*
* @return A human-readable description for this extension, or {@code null}
* or an empty array if no description should be available.
*/
@Override
public String[] getExtensionDescription()
{
return new String[] {
"This extension implements a JDBC Sync Source which can be used " +
"to detect and synchronize changes from a single data table. " +
"For the sake of simplicity, the columns in the table have " +
"LDAP attribute names (cn, sn, givenname, etc). The database " +
"uses a changelog table to capture changes, and this extension " +
"is configured to manage it."
};
}
/**
* Updates the provided argument parser to define any configuration arguments
* which may be used by this extension. The argument parser may also be
* updated to define relationships between arguments (e.g. to specify
* required, exclusive, or dependent argument sets).
*
* @param parser The argument parser to be updated with the configuration
* arguments which may be used by this extension.
*
* @throws ArgumentException If a problem is encountered while updating the
* provided argument parser.
*/
@Override
public void defineConfigArguments(final ArgumentParser parser)
throws ArgumentException
{
// No arguments will be allowed by default.
}
/**
* This hook is called when a Sync Pipe first starts up, when the
* resync process first starts up, or when the set-startpoint
* subcommand is called from the realtime-sync command line tool.
* Any initialization of this sync source should be performed here. This
* method should generally store the {@link SyncServerContext} in a class
* member so that it can be used elsewhere in the implementation.
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param serverContext A handle to the server context for the server in
* which this extension is running.
* @param config The general configuration for this sync source.
* @param parser The argument parser which has been initialized from
* the configuration for this JDBC sync source.
*/
@Override
public void initializeJDBCSyncSource(final TransactionContext ctx,
final SyncServerContext serverContext,
final JDBCSyncSourceConfig config,
final ArgumentParser parser)
{
this.serverContext = serverContext;
this.monitorProvider = serverContext.registerMonitorProvider(
new SimpleMonitorProvider(), config);
}
/**
* This hook is called when a Sync Pipe shuts down, when the resync
* process shuts down, or when the set-startpoint subcommand (from the
* realtime-sync command line tool) is finished. Any clean up of this
* sync source should be performed here.
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
*/
@Override
public void finalizeJDBCSyncSource(final TransactionContext ctx)
{
serverContext.deregisterMonitorProvider(monitorProvider);
}
/**
* This method should effectively set the starting point for synchronization
* to the place specified by the options
parameter. This should
* cause all changes previous to the specified start point to be disregarded
* and only changes after that point to be returned by
* {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
*
* There are several different startpoint types (see * {@link SetStartpointOptions}), and this implementation is not required to * support them all. If the specified startpoint type is unsupported, this * method should throw an {@link IllegalArgumentException}. *
* IMPORTANT: The RESUME_AT_SERIALIZABLE
startpoint type
* must be supported by your implementation, because this is used when a Sync
* Pipe first starts up.
*
* This method can be called from two different contexts: *
* This method is called periodically and the return value is saved in the * persistent state for the Sync Pipe that uses this script as its Sync * Source. *
* IMPORTANT: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this script (via * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. * @return a value to store in the persistent state for the Sync Pipe. This is * usually a change number, but if a changelog table is not used to * detect changes, this value should represent some other token to * pass into * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} * when the sync pipe starts up. */ @Override public Serializable getStartpoint() { return Long.valueOf(lastCompletedChangeNumber); } /** * Return a full source entry (in LDAP form) from the database, corresponding * to the {@link DatabaseChangeRecord} that is passed in through the * {@link SyncOperation}. This method should perform any queries necessary to * gather the latest values for all the attributes to be synchronized. *
* This method must be thread safe, as it will be called repeatedly and
* concurrently by each of the Sync Pipe worker threads as they process
* entries.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param operation
* the SyncOperation which identifies the database "entry" to
* fetch. The DatabaseChangeRecord can be obtained by calling
*
* IMPORTANT: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this script (via
* this method). Otherwise it will be possible for changes to be missed when
* the Data Sync Server is restarted or a connection error occurs.
*
* A {@link TransactionContext} is provided in case the acknowledgment needs
* to make it all the way back to the database itself (for example if you were
* using Oracle's Change Data Capture).
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param completedOps
* a list of {@link SyncOperation}s that have finished processing.
* The records are listed in the order they were first detected.
* @throws SQLException
* if there is an error acknowledging the changes back to the
* database
*/
@Override
public void acknowledgeCompletedOps(
final TransactionContext ctx,
final LinkedList
* On the first invocation, this should return changes starting from the
* startpoint that was set by
* {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This
* method is responsible for updating the internal state such that subsequent
* invocations do not return duplicate changes.
*
* The resulting list should be limited by
* IMPORTANT: While this method needs to keep track of which changes
* have already been returned so that it does not return them again, it should
* NOT modify the official startpoint. The internal value for the
* startpoint should only be updated after a sync operation is acknowledged
* back to this script (via
* {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs. The
* startpoint should not change as a result of this method.
*
* This method does not need to be thread-safe. It will be invoked
* repeatedly by a single thread, based on the polling interval set in the
* Sync Pipe configuration.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param maxChanges
* the maximum number of changes to retrieve
* @param numStillPending
* this should be set to the number of unretrieved changes that
* are still pending after this batch has been retrieved. This will
* be passed in as zero, and may be left that way if the actual value
* cannot be determined.
* @return a list of {@link DatabaseChangeRecord} instances, each
* corresponding to a row in the changelog table (or the equivalent if
* some other change tracking mechanism is being used). If there are
* no new changes to return, this method should return an empty list.
* @throws SQLException
* if there is any error while retrieving the next batch of changes
*/
@Override
public List
* The
* This method should not return until all the entries of the given entryType
* have been added to the output queue. Separate threads will concurrently
* drain entries from the queue and process them. (The queue should not
* actually contain full entries, but rather DatabaseChangeRecord objects
* which identify the full database entries. These objects are then
* individually passed in to
* {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
* it is important to make sure that the DatabaseChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
*
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param entryType
* the type of database entry to be fetched (this is specified
* on the CLI for the resync command)
* @param outputQueue
* a queue of DatabaseChangeRecord objects which will be individually
* fetched via
* {@link #fetchEntry(TransactionContext, SyncOperation)}
* @throws SQLException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(
final TransactionContext ctx,
final String entryType,
final BlockingQueue
* The format for the
* This method should not return until all the entries specified by the input
* file have been added to the output queue. Separate threads will
* concurrently drain entries from the queue and process them. (The queue
* should not actually contain full entries, but rather DatabaseChangeRecord
* objects which identify the full database entries. These objects are then
* individually passed in to
* {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
* it is important to make sure that the DatabaseChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
*
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param entryType
* the type of database entry to be fetched (this is specified
* on the CLI for the resync command)
* @param inputLines
* an Iterator containing the lines from the specified input file to
* resync (this is specified on the CLI for the resync command).
* These lines can be any format, for example a set of primary keys,
* a set of WHERE clauses, a set of full SQL queries, etc.
* @param outputQueue
* a queue of DatabaseChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
* @throws SQLException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(
final TransactionContext ctx,
final String entryType,
final Iterator
* NOTE: If the system clock on the database server is not in sync with
* the system clock on the Data Sync Server, this method should query
* the database for its current time in order to determine the cut-off point
* for deleting changelog records.
*
* If a separate mechanism will be used to manage the changelog table, this
* method may be implemented as a no-op and always return zero. This is how
* the default implementation behaves.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param maxAgeMillis
* the period of time (in milliseconds) after which a changelog table
* record should be deleted
* @return the number of rows that were deleted from the changelog table
* @throws SQLException
* if there is an error purging records from the changelog table
*/
@Override
public int cleanupChangelog(final TransactionContext ctx,
final long maxAgeMillis) throws SQLException
{
//get current time on database
PreparedStatement stmt = ctx.prepareStatement("SELECT CURRENT_TIMESTAMP");
ResultSet rset = stmt.executeQuery();
long currentTimeMillis;
try
{
if(rset.next())
{
currentTimeMillis = rset.getTimestamp(1).getTime();
}
else
{
throw new SQLException(
"Cannot determine current timestamp on database.");
}
}
finally
{
rset.close();
stmt.close();
}
stmt = ctx.prepareStatement(
"DELETE FROM " + CHANGELOG_TABLE + " WHERE change_time < ?");
stmt.clearWarnings();
stmt.setTimestamp(1, new Timestamp(currentTimeMillis - maxAgeMillis));
int rowCount = stmt.executeUpdate();
stmt.close();
return rowCount;
}
/**
* A simple implementation of MonitorProvider that we can register with the
* server through the ServerContext object. This provides the current values
* of nextChangeNumberToRetrieve and lastCompletedChangeNumber.
*/
private class SimpleMonitorProvider extends MonitorProvider
{
/**
* Retrieves a human-readable name for this extension. This is not used
* since we are registering this MonitorProvider through the ServerContext
* object (e.g. it is not a stand-alone extension).
*
* @return A human-readable name for this extension.
*/
@Override
public String getExtensionName()
{
return "SimpleMonitorProvider";
}
/**
* Retrieves a human-readable description for this extension. Each element
* of the array that is returned will be considered a separate paragraph in
* generated documentation. This is not used since we are registering this
* MonitorProvider through the ServerContext object (e.g. it is not a
* stand-alone extension).
*
* @return A human-readable description for this extension, or {@code null}
* or an empty array if no description should be available.
*/
@Override
public String[] getExtensionDescription()
{
return null;
}
/**
* Retrieves the name that identifies this monitor provider instance. It
* will be used as the value of the naming attribute for monitor entries.
* Each monitor provider instance must have a unique name.
*
* @return The name that identifies this monitor provider instance.
*/
@Override
public String getMonitorInstanceName()
{
return "ExampleJDBCSyncSource Monitor";
}
/**
* Retrieves the name of the object class that will be used for monitor
* entries created from this monitor provider. This does not need to be
* defined in the server schema. It may be {@code null} if a default object
* class should be used.
*
* @return The name of the object class that will be used for monitor
* entries created from this monitor provider.
*/
@Override
public String getMonitorObjectClass()
{
return "jdbc-sync-source-monitor-entry";
}
/**
* Retrieves a list of attributes containing the data to include in the
* monitor entry generated from this monitor provider.
*
* @return A list of attributes containing the data to include in the
* monitor entry generated from this monitor provider.
*/
@Override
public Listoperation.getDatabaseChangeRecord()
.
* This is what is returned by
* {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}
* and also what comes out of
* {@link #listAllEntries(TransactionContext, String, BlockingQueue)}.
* @return a full LDAP Entry, or null if no such entry exists.
* @throws SQLException
* if there is an error fetching the entry
*/
@Override
public Entry fetchEntry(final TransactionContext ctx,
final SyncOperation operation)
throws SQLException
{
//Create a map of all the identity key/value pairs (delimited by '%%') that
//make up the identifiable info. In this single-table example, the
//identifier is always "uid={uid}". This comes from the 'identifier' column
//in the changelog table. It is up to the trigger code for each table to
//construct this.
DatabaseChangeRecord changeRecord = operation.getDatabaseChangeRecord();
DN id = changeRecord.getIdentifiableInfo();
MapmaxChanges
. The
* numStillPending
reference should be set to the estimated
* number of changes that haven't yet been retrieved from the changelog table
* when this method returns, or zero if all the current changes have been
* retrieved.
* entryType
is user-defined; it will be
* passed in on the command line for resync. The outputQueue
* should contain {@link DatabaseChangeRecord} objects with the
* ChangeType
set to resync.
*
*
* If the total set of entries is very large, it is fine to split up the work
* into multiple database queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* inputLines
(e.g. the content of the file)
* is user-defined; it may be key/value pairs, primary keys, or full SQL
* statements, for example. The use of this method is triggered via the
* --sourceInputFile argument on the resync CLI. The
* outputQueue
should contain {@link DatabaseChangeRecord}
* objects with the ChangeType
set to resync.
*
*
* If the total set of entries is very large, it is fine to split up the work
* into multiple database queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* maxAgeMillis
milliseconds old.
*