/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* docs/licenses/cddl.txt
* or http://www.opensource.org/licenses/cddl1.php.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* docs/licenses/cddl.txt. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2010-2016 UnboundID Corp.
*/
package com.unboundid.directory.sdk.examples.groovy;
import java.sql.*;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import java.io.Serializable;
import com.unboundid.ldap.sdk.Entry;
import com.unboundid.ldap.sdk.DN;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions;
import com.unboundid.directory.sdk.sync.types.SetStartpointOptions.StartpointType;
import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord;
import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord.ChangeType;
import com.unboundid.directory.sdk.sync.types.TransactionContext;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.sync.scripting.ScriptedJDBCSyncSource;
import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig;
import com.unboundid.directory.sdk.sync.util.ScriptUtils;
import com.unboundid.directory.sdk.common.api.MonitorProvider;
import com.unboundid.directory.sdk.common.types.LogSeverity;
import com.unboundid.directory.sdk.common.types.RegisteredMonitorProvider;
/**
* This class implements the necessary methods to synchronize data from a
* simple, single-table database schema to its LDAP counterpart.
* <p>
* To use this script, place it under
* /lib/groovy-scripted-extensions/com/unboundid/directory/sdk/examples/groovy
* and set the 'script-class' property on the Sync Source to
* "com.unboundid.directory.sdk.examples.groovy.ExampleJDBCSyncSource".
* <p>
* This example also demonstrates how to register an ad-hoc monitor provider
* with the server, without having to add any server-side configuration.
*/
public final class ExampleScriptedJDBCSyncSource extends ScriptedJDBCSyncSource
{
//The server context which can be used for obtaining the server state, logging, etc.
private SyncServerContext serverContext;
//The name of the data table.
private static final String DATA_TABLE = "DataTable";
//The name of the changelog table.
private static final String CHANGELOG_TABLE = "ChangeLog";
//Handle to the monitor provider registered with the server.
private RegisteredMonitorProvider monitorProvider;
//Used to keep track of which changes have been retrieved.
private long nextChangeNumberToRetrieve;
//Used to keep track of which changes have finished processing.
//NOTE: this is the official "startpoint" for this implementation.
private long lastCompletedChangeNumber;
/**
* Updates the provided argument parser to define any configuration arguments
* which may be used by this extension. The argument parser may also be
* updated to define relationships between arguments (e.g. to specify
* required, exclusive, or dependent argument sets).
*
* @param parser The argument parser to be updated with the configuration
* arguments which may be used by this extension.
*
* @throws ArgumentException If a problem is encountered while updating the
* provided argument parser.
*/
@Override
public void defineConfigArguments(final ArgumentParser parser)
throws ArgumentException
{
// No arguments will be allowed by default.
}
/**
* This hook is called when a Sync Pipe first starts up, when the
* <i>resync</i> process first starts up, or when the set-startpoint
* subcommand is called from the <i>realtime-sync</i> command line tool.
* Any initialization of this sync source should be performed here. This
* method should generally store the {@link SyncServerContext} in a class
* member so that it can be used elsewhere in the implementation.
* <p>
* The default implementation is empty.
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param serverContext A handle to the server context for the server in
* which this extension is running.
* @param config The general configuration for this sync source.
* @param parser The argument parser which has been initialized from
* the configuration for this JDBC sync source.
*/
@Override
public void initializeJDBCSyncSource(final TransactionContext ctx,
final SyncServerContext serverContext,
final JDBCSyncSourceConfig config,
final ArgumentParser parser)
{
this.serverContext = serverContext;
this.monitorProvider = serverContext.registerMonitorProvider(
new SimpleMonitorProvider(), config);
}
/**
* This hook is called when a Sync Pipe shuts down, when the <i>resync</i>
* process shuts down, or when the set-startpoint subcommand (from the
* <i>realtime-sync</i> command line tool) is finished. Any clean up of this
* sync source should be performed here.
*
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
*/
@Override
public void finalizeJDBCSyncSource(final TransactionContext ctx)
{
serverContext.deregisterMonitorProvider(monitorProvider);
}
/**
* This method should effectively set the starting point for synchronization
* to the place specified by the <code>options</code> parameter. This should
* cause all changes previous to the specified start point to be disregarded
* and only changes after that point to be returned by
* {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}.
* <p>
* There are several different startpoint types (see
* {@link SetStartpointOptions}), and this implementation is not required to
* support them all. If the specified startpoint type is unsupported, this
* method should throw an {@link IllegalArgumentException}.
* <p>
* <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type
* must be supported by your implementation, because this is used when a Sync
* Pipe first starts up.
* <p>
* This method can be called from two different contexts:
* <ul>
* <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used
* (the Sync Pipe is required to be stopped in this context)</li>
* <li>Immediately after a connection is first established to the source
* server (e.g. before the first call to
* {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li>
* </ul>
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param options
* an object which indicates where exactly to start synchronizing
* (e.g. the end of the changelog, specific change number, a certain
* time ago, etc)
* @throws SQLException
* if there is any error while setting the start point
*/
@Override
public void setStartpoint(final TransactionContext ctx, final SetStartpointOptions options)
throws SQLException
{
switch(options.getStartpointType())
{
case StartpointType.BEGINNING_OF_CHANGELOG:
lastCompletedChangeNumber = 0;
nextChangeNumberToRetrieve = 0;
break;
case StartpointType.END_OF_CHANGELOG:
PreparedStatement stmt = ctx.prepareStatement(
"SELECT MAX(change_number) AS value FROM " + CHANGELOG_TABLE);
ResultSet rset = stmt.executeQuery();
try
{
long value = 0;
if(rset.next())
{
value = rset.getLong("value");
}
else
{
String msg = "Could not find max change number";
serverContext.logMessage(LogSeverity.SEVERE_ERROR, msg);
throw new SQLException(msg);
}
lastCompletedChangeNumber = value;
nextChangeNumberToRetrieve = value + 1;
}
finally
{
rset.close();
stmt.close();
}
break;
case StartpointType.RESUME_AT_CHANGE_NUMBER:
nextChangeNumberToRetrieve = options.getChangeNumber();
lastCompletedChangeNumber = nextChangeNumberToRetrieve - 1;
break;
case StartpointType.RESUME_AT_SERIALIZABLE: //When sync first starts up, this method is
//called with this StartpointType to initialize
//the internal state.
Serializable token = options.getSerializableValue();
if(token != null)
{
lastCompletedChangeNumber = (Long) token;
nextChangeNumberToRetrieve = lastCompletedChangeNumber + 1;
}
break;
default:
throw new IllegalArgumentException("This startpoint type is not supported: " +
options.getStartpointType().toString());
}
}
/**
* Gets the current value of the startpoint for change detection. This is the
* "bookmark" which indicates which changes have already been processed and
* which have not. In most cases, a change number is used to detect changes
* and is managed by the Synchronization Server, in which case this
* implementation needs only to return the latest acknowledged
* change number. In other cases, the return value may correspond to a
* different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server.
* In any case, this method should return the value that is updated by
* {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}.
* <p>
* This method is called periodically and the return value is saved in the
* persistent state for the Sync Pipe that uses this script as its Sync
* Source.
* <p>
* <b>IMPORTANT</b>: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this script (via
* {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs.
* @return a value to store in the persistent state for the Sync Pipe. This is
* usually a change number, but if a changelog table is not used to
* detect changes, this value should represent some other token to
* pass into
* {@link #setStartpoint(TransactionContext, SetStartpointOptions)}
* when the sync pipe starts up.
*/
@Override
public Serializable getStartpoint()
{
return Long.valueOf(lastCompletedChangeNumber);
}
/**
* Return a full source entry (in LDAP form) from the database, corresponding
* to the {@link DatabaseChangeRecord} that is passed in through the
* {@link SyncOperation}. This method should perform any queries necessary to
* gather the latest values for all the attributes to be synchronized.
* <p>
* This method <b>must be thread safe</b>, as it will be called repeatedly and
* concurrently by each of the Sync Pipe worker threads as they process
* entries.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param operation
* the SyncOperation which identifies the database "entry" to
* fetch. The DatabaseChangeRecord can be obtained by calling
* <code>operation.getDatabaseChangeRecord()</code>.
* This is what is returned by
* {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}
* and also what comes out of
* {@link #listAllEntries(TransactionContext, String, BlockingQueue)}
* .
* @return a full LDAP Entry, or null if no such entry exists.
* @throws SQLException
* if there is an error fetching the entry
*/
@Override
public Entry fetchEntry(final TransactionContext ctx, final SyncOperation operation)
throws SQLException
{
//Create a map of all the identity key/value pairs (delimited by '%%') that make
//up the identifiable info. In this single-table example, the identifier is always
//"uid={uid}". This comes from the 'identifier' column in the changelog
//table. It is up to the trigger code for each table to construct this.
DatabaseChangeRecord changeRecord = operation.getDatabaseChangeRecord();
DN id = changeRecord.getIdentifiableInfo();
Map<String,String> keyAndValue = ScriptUtils.dnToMap(id);
if(!keyAndValue.containsKey("uid"))
{
throw new IllegalArgumentException("Identifier is missing 'uid'");
}
Entry entry;
String entryType = changeRecord.getEntryType();
if(entryType.equalsIgnoreCase("person"))
{
long uid = Long.valueOf(keyAndValue.get("uid"));
String sql = "SELECT * FROM " + DATA_TABLE + " WHERE uid = ?";
PreparedStatement stmt = ctx.prepareStatement(sql);
try
{
stmt.setLong(1, uid);
//Create an entry using the literal column names as attribute names
entry = ctx.searchToRawEntry(stmt, "uid");
}
finally
{
stmt.close();
}
}
else
{
throw new IllegalArgumentException("Unknown entry type: " + entryType);
}
return entry;
}
/**
* Provides a way for the Data Sync Server to acknowledge back to the
* script which sync operations it has processed. This method should update
* the official startpoint which was set by
* {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is
* returned by {@link #getStartpoint()}.
* <p>
* <b>IMPORTANT</b>: The internal value for the startpoint should only be
* updated after a sync operation is acknowledged back to this script (via
* this method). Otherwise it will be possible for changes to be missed when
* the Data Sync Server is restarted or a connection error occurs.
* <p>
* A {@link TransactionContext} is provided in case the acknowledgment needs
* to make it all the way back to the database itself (for example if you were
* using Oracle's Change Data Capture).
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param completedOps
* a list of {@link SyncOperation}s that have finished processing.
* The records are listed in the order they were first detected.
* @throws SQLException
* if there is an error acknowledging the changes back to the
* database
*/
@Override
public void acknowledgeCompletedOps(final TransactionContext ctx,
final LinkedList<SyncOperation> completedOps)
throws SQLException
{
if(!completedOps.isEmpty())
{
//Update lastCompletedChangeNumber to that of the last completed operation
DatabaseChangeRecord last = completedOps.getLast().getDatabaseChangeRecord();
lastCompletedChangeNumber = last.getChangeNumber();
}
}
/**
* Return the next batch of change records from the database. Change records
* are just hints that a change happened; they do not include the actual data
* of the change. In an effort to never synchronize stale data, the
* Data Sync Server will go back and fetch the full source entry for
* each change record.
* <p>
* On the first invocation, this should return changes starting from the
* startpoint that was set by
* {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This
* method is responsible for updating the internal state such that subsequent
* invocations do not return duplicate changes.
* <p>
* The resulting list should be limited by <code>maxChanges</code>. The
* <code>numStillPending</code> reference should be set to the estimated
* number of changes that haven't yet been retrieved from the changelog table
* when this method returns, or zero if all the current changes have been
* retrieved.
* <p>
* <b>IMPORTANT</b>: While this method needs to keep track of which changes
* have already been returned so that it does not return them again, it should
* <b>NOT</b> modify the official startpoint. The internal value for the
* startpoint should only be updated after a sync operation is acknowledged
* back to this script (via
* {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}).
* Otherwise it will be possible for changes to be missed when the
* Data Sync Server is restarted or a connection error occurs. The
* startpoint should not change as a result of this method.
* <p>
* This method <b>does not need to be thread-safe</b>. It will be invoked
* repeatedly by a single thread, based on the polling interval set in the
* Sync Pipe configuration.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param maxChanges
* the maximum number of changes to retrieve
* @param numStillPending
* this should be set to the number of unretrieved changes that
* are still pending after this batch has been retrieved. This will
* be passed in
* as zero, and may be left that way if the actual value cannot be
* determined.
* @return a list of {@link DatabaseChangeRecord} instances, each
* corresponding to a row in the changelog table (or the equivalent if
* some other change tracking mechanism is being used). If there are
* no new changes to return, this method should return an empty list.
* @throws SQLException
* if there is any error while retrieving the next batch of changes
*/
@Override
public List<DatabaseChangeRecord> getNextBatchOfChanges(final TransactionContext ctx,
final int maxChanges,
final AtomicLong numStillPending)
throws SQLException
{
List<DatabaseChangeRecord> results = new ArrayList<DatabaseChangeRecord>();
PreparedStatement stmt = ctx.prepareStatement(
"SELECT * FROM " + CHANGELOG_TABLE + " WHERE change_number >= ? ORDER BY" +
" change_number ASC");
//This is a generic way to limit the size of the result set; however, this may
//or may not be implemented by the JDBC driver to alter the query itself. It may
//just allow the full set of results to come back and then perform the truncation
//within the JVM, which can cause out of memory errors. Most databases have a
//much more efficient mechansim (for example "SELECT TOP(1000)..." in SQLServer
//and "SELECT ... WHERE ROWNUM < 1000" in Oracle).
stmt.setMaxRows(maxChanges);
stmt.setLong(1, nextChangeNumberToRetrieve);
ResultSet rset = stmt.executeQuery();
while(rset.next())
{
if(results.size() >= maxChanges)
{
serverContext.debugError("The result set contained too many rows; expected no more than " + maxChanges);
break;
}
//In this case there is a change_type column in the changelog table which gives us the change type
ChangeType type = ChangeType.valueOf(rset.getString("change_type"));
DatabaseChangeRecord.Builder bldr = new DatabaseChangeRecord.Builder(
type, rset.getString("identifier"));
long changeNum = rset.getLong("change_number");
//Update nextChangeNumberToRetrieve so that the next call will get the next batch
nextChangeNumberToRetrieve = changeNum + 1;
bldr.changeNumber(changeNum);
bldr.tableName(rset.getString("table_name"));
String entryType = rset.getString("entry_type");
if(!entryType.equalsIgnoreCase("person"))
{
//This sync source only handles the "person" entry type
serverContext.debugInfo("Skipping change with entry type: " + entryType);
continue;
}
bldr.entryType(entryType);
//Get the list of changed columns for this change. For UPDATE operations, the Data Sync Server
//will only modify the destination attributes that depend on the originally changed source
//columns (i.e. if this is not set, no modifications will take place on the destination entry).
String cols = rset.getString("changed_columns");
bldr.changedColumns(cols != null ? cols.split(",") : null);
//Get the database user who made the change
bldr.modifier(rset.getString("modifiers_name"));
//Get the timestamp of the change
bldr.changeTime(rset.getTimestamp("change_time").getTime());
results.add(bldr.build());
}
rset.close();
stmt.close();
//Figure out how many changes are still unretrieved at this point
stmt = ctx.prepareStatement("SELECT COUNT(*) FROM " + CHANGELOG_TABLE + " WHERE change_number >= ?");
stmt.setLong(1, nextChangeNumberToRetrieve);
rset = stmt.executeQuery();
if(rset.next())
{
long stillPending = rset.getLong(1);
numStillPending.set(stillPending);
}
rset.close();
stmt.close();
return results;
}
/**
* Gets a list of all the entries in the database for a given entry type. This
* is used by the 'resync' command line tool. The default implementation
* throws a {@link UnsupportedOperationException}; subclasses should override
* if the resync functionality is needed.
* <p>
* The <code>entryType</code> is user-defined; it will be
* passed in on the command line for resync. The <code>outputQueue</code>
* should contain {@link DatabaseChangeRecord} objects with the
* <code>ChangeType</code> set to <i>resync</i>.
* <p>
* This method should not return until all the entries of the given entryType
* have been added to the output queue. Separate threads will concurrently
* drain entries from the queue and process them. (The queue should not
* actually contain full entries, but rather DatabaseChangeRecord objects
* which identify the full database entries. These objects are then
* individually passed in to
* {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
* it is important to make sure that the DatabaseChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
* <p>
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
* <ol>
* <li>Stream out a list of all IDs in the database (for a given entryType)
* </li>
* <li>Fetch full source entry for an ID</li>
* <li>Perform any mappings and compute the equivalent destination entry</li>
* <li>Fetch full destination entry</li>
* <li>Diff the computed destination entry and actual destination entry</li>
* <li>Apply the minimal set of changes at the destination to bring it in sync
* </li>
* </ol>
* If the total set of entries is very large, it is fine to split up the work
* into multiple database queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* <p>
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param entryType
* the type of database entry to be fetched (this is specified
* on the CLI for the resync command)
* @param outputQueue
* a queue of DatabaseChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
* @throws SQLException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(final TransactionContext ctx,
final String entryType,
final BlockingQueue<DatabaseChangeRecord> outputQueue)
throws SQLException
{
serverContext.debugInfo("Beginning to dump all entries...");
if(entryType.equalsIgnoreCase("person"))
{
//Get a full list of the UIDs
PreparedStatement stmt = ctx.prepareStatement(
"SELECT uid FROM " + DATA_TABLE + " ORDER BY uid ASC");
ResultSet rset = stmt.executeQuery();
while(rset.next())
{
long uid = rset.getLong("uid");
DatabaseChangeRecord.Builder bldr =
new DatabaseChangeRecord.Builder(ChangeType.resync, "uid=" + uid);
//set the entry type so that fetchEntry() can use it
bldr.entryType(entryType);
outputQueue.put(bldr.build());
}
rset.close();
stmt.close();
}
else
{
throw new IllegalArgumentException("Unknown entry type: " + entryType);
}
}
/**
* Gets a list of all the entries in the database from a given file input.
* This is used by the 'resync' command line tool. The default implementation
* throws a {@link UnsupportedOperationException}; subclasses should override
* if the resync functionality is needed for specific database records, which
* can be specified in the input file.
* <p>
* The format for the <code>inputLines</code> (e.g. the content of the file)
* is user-defined; it may be key/value pairs, primary keys, or full SQL
* statements, for example. The use of this method is triggered via the
* <i>--sourceInputFile</i> argument on the resync CLI. The
* <code>outputQueue</code> should contain {@link DatabaseChangeRecord}
* objects with the <code>ChangeType</code> set to <i>resync</i>.
* <p>
* This method should not return until all the entries specified by the input
* file have been added to the output queue. Separate threads will
* concurrently drain entries from the queue and process them. (The queue
* should not actually contain full entries, but rather DatabaseChangeRecord
* objects which identify the full database entries. These objects are then
* individually passed in to
* {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore,
* it is important to make sure that the DatabaseChangeRecord instances
* contain enough identifiable information (e.g. primary keys) for each entry
* so that the entry can be found again.
* <p>
* The lifecycle of resync is similar to that of real-time sync, with a few
* differences:
* <ol>
* <li>Stream out a list of all IDs in the database (using the given input
* file)</li>
* <li>Fetch full source entry for an ID</li>
* <li>Perform any mappings and compute the equivalent destination entry</li>
* <li>Fetch full destination entry</li>
* <li>Diff the computed destination entry and actual destination entry</li>
* <li>Apply the minimal set of changes at the destination to bring it in sync
* </li>
* </ol>
* If the total set of entries is very large, it is fine to split up the work
* into multiple database queries within this method. The queue will not grow
* out of control because it blocks when it becomes full. The queue capacity
* is fixed at 1000.
* <p>
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param entryType
* the type of database entry to be fetched (this is specified
* on the CLI for the resync command)
* @param inputLines
* an Iterator containing the lines from the specified input file to
* resync (this is specified on the CLI for the resync command).
* These lines can be any format, for example a set of primary keys,
* a set of WHERE clauses, a set of full SQL queries, etc.
* @param outputQueue
* a queue of DatabaseChangeRecord objects which will be individually
* fetched via {@link #fetchEntry(TransactionContext, SyncOperation)}
* @throws SQLException
* if there is an error retrieving the list of entries to resync
*/
@Override
public void listAllEntries(
final TransactionContext ctx,
final String entryType,
final Iterator<String> inputLines,
final BlockingQueue<DatabaseChangeRecord> outputQueue)
throws SQLException
{
if(!entryType.equalsIgnoreCase("person"))
{
throw new IllegalArgumentException("Unknown entry type: " + entryType);
}
while(inputLines.hasNext())
{
String uid = inputLines.next().trim();
if(uid.isEmpty())
{
continue;
}
DatabaseChangeRecord.Builder bldr = new DatabaseChangeRecord.Builder(
ChangeType.resync, "uid=" + uid);
//Set the entry type so that fetchEntry() can use it
bldr.entryType(entryType);
outputQueue.put(bldr.build());
}
}
/**
* Performs a cleanup of the changelog table (if desired). There is a
* background thread that periodically invokes this method. It should remove
* any rows in the changelog table that are more than
* <code>maxAgeMillis</code> milliseconds old.
* <p>
* <b>NOTE:</b> If the system clock on the database server is not in sync with
* the system clock on the Data Sync Server, this method should query
* the database for its current time in order to determine the cut-off point
* for deleting changelog records.
* <p>
* If a separate mechanism will be used to manage the changelog table, this
* method may be implemented as a no-op and always return zero. This is how
* the default implementation behaves.
* @param ctx
* a TransactionContext which provides a valid JDBC connection to the
* database.
* @param maxAgeMillis
* the period of time (in milliseconds) after which a changelog table
* record should be deleted
* @return the number of rows that were deleted from the changelog table
* @throws SQLException
* if there is an error purging records from the changelog table
*/
@Override
public int cleanupChangelog(final TransactionContext ctx, final long maxAgeMillis) throws SQLException
{
//get current time on database
PreparedStatement stmt = ctx.prepareStatement("SELECT CURRENT_TIMESTAMP");
ResultSet rset = stmt.executeQuery();
long currentTimeMillis;
try
{
if(rset.next())
{
currentTimeMillis = rset.getTimestamp(1).getTime();
}
else
{
throw new SQLException("Cannot determine current timestamp on database.");
}
}
finally
{
rset.close();
stmt.close();
}
stmt = ctx.prepareStatement(
"DELETE FROM " + CHANGELOG_TABLE + " WHERE change_time < ?");
stmt.clearWarnings();
stmt.setTimestamp(1, new Timestamp(currentTimeMillis - maxAgeMillis));
int rowCount = stmt.executeUpdate();
stmt.close();
return rowCount;
}
/**
* A simple implementation of MonitorProvider that we can register with the
* server through the ServerContext object. This provides the current values
* of nextChangeNumberToRetrieve and lastCompletedChangeNumber.
*/
private class SimpleMonitorProvider extends MonitorProvider
{
/**
* Retrieves a human-readable name for this extension. This is not used
* since we are registering this MonitorProvider through the ServerContext
* object (e.g. it is not a stand-alone extension).
*
* @return A human-readable name for this extension.
*/
@Override
public String getExtensionName()
{
return "SimpleMonitorProvider";
}
/**
* Retrieves a human-readable description for this extension. Each element
* of the array that is returned will be considered a separate paragraph in
* generated documentation. This is not used since we are registering this
* MonitorProvider through the ServerContext object (e.g. it is not a
* stand-alone extension).
*
* @return A human-readable description for this extension, or {@code null}
* or an empty array if no description should be available.
*/
@Override
public String[] getExtensionDescription()
{
return null;
}
/**
* Retrieves the name that identifies this monitor provider instance. It
* will be used as the value of the naming attribute for monitor entries.
* Each monitor provider instance must have a unique name.
*
* @return The name that identifies this monitor provider instance.
*/
@Override
public String getMonitorInstanceName()
{
return "ExampleJDBCSyncSource Monitor";
}
/**
* Retrieves the name of the object class that will be used for monitor
* entries created from this monitor provider. This does not need to be
* defined in the server schema. It may be {@code null} if a default object
* class should be used.
*
* @return The name of the object class that will be used for monitor
* entries created from this monitor provider.
*/
@Override
public String getMonitorObjectClass()
{
return "jdbc-sync-source-monitor-entry";
}
/**
* Retrieves a list of attributes containing the data to include in the
* monitor entry generated from this monitor provider.
*
* @return A list of attributes containing the data to include in the
* monitor entry generated from this monitor provider.
*/
@Override
public List<Attribute> getMonitorAttributes()
{
List<Attribute> attributes = new ArrayList<Attribute>(2);
attributes.add(new Attribute("next-changenumber-to-retrieve",
String.valueOf(nextChangeNumberToRetrieve)));
attributes.add(new Attribute("last-completed-change-number",
String.valueOf(lastCompletedChangeNumber)));
return attributes;
}
}
}
|