/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * docs/licenses/cddl.txt * or http://www.opensource.org/licenses/cddl1.php. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * docs/licenses/cddl.txt. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2010-2024 Ping Identity Corporation */ package com.unboundid.directory.sdk.examples; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicLong; import java.io.Serializable; import com.unboundid.ldap.sdk.Attribute; import com.unboundid.ldap.sdk.Entry; import com.unboundid.ldap.sdk.DN; import com.unboundid.util.args.ArgumentParser; import com.unboundid.util.args.ArgumentException; import com.unboundid.util.StaticUtils; import com.unboundid.directory.sdk.sync.api.JDBCSyncSource; import com.unboundid.directory.sdk.sync.config.JDBCSyncSourceConfig; import com.unboundid.directory.sdk.sync.types.SyncServerContext; import com.unboundid.directory.sdk.sync.types.SetStartpointOptions; import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord; import com.unboundid.directory.sdk.sync.types.DatabaseChangeRecord.ChangeType; import com.unboundid.directory.sdk.sync.types.TransactionContext; import com.unboundid.directory.sdk.sync.types.SyncOperation; import com.unboundid.directory.sdk.sync.util.ScriptUtils; import com.unboundid.directory.sdk.common.api.MonitorProvider; import com.unboundid.directory.sdk.common.types.LogSeverity; import com.unboundid.directory.sdk.common.types.RegisteredMonitorProvider; /** * This class provides a simple example of a JDBC Sync Source which will detect * changes from a single database table and return them to the * Data Sync Server so that they can be synchronized to a destination * endpoint. This implementation does not use any configuration arguments. * <p> * This example also demonstrates how to register an ad-hoc monitor provider * with the server, without having to add any server-side configuration. */ public final class ExampleJDBCSyncSource extends JDBCSyncSource { //The server context which can be used for obtaining the server state, //logging, etc. private SyncServerContext serverContext; //The name of the data table. private static final String DATA_TABLE = "DataTable"; //The name of the changelog table. private static final String CHANGELOG_TABLE = "ChangeLog"; //Handle to the monitor provider registered with the server. private RegisteredMonitorProvider monitorProvider; //Used to keep track of which changes have been retrieved. private volatile long nextChangeNumberToRetrieve; //Used to keep track of which changes have finished processing. //NOTE: this is the official "startpoint" for this implementation. private volatile long lastCompletedChangeNumber; /** * Retrieves a human-readable name for this extension. * * @return A human-readable name for this extension. */ @Override public String getExtensionName() { return "Example JDBC Sync Source"; } /** * Retrieves a human-readable description for this extension. Each element * of the array that is returned will be considered a separate paragraph in * generated documentation. * * @return A human-readable description for this extension, or {@code null} * or an empty array if no description should be available. */ @Override public String[] getExtensionDescription() { return new String[] { "This extension implements a JDBC Sync Source which can be used " + "to detect and synchronize changes from a single data table. " + "For the sake of simplicity, the columns in the table have " + "LDAP attribute names (cn, sn, givenname, etc). The database " + "uses a changelog table to capture changes, and this extension " + "is configured to manage it." }; } /** * Updates the provided argument parser to define any configuration arguments * which may be used by this extension. The argument parser may also be * updated to define relationships between arguments (e.g. to specify * required, exclusive, or dependent argument sets). * * @param parser The argument parser to be updated with the configuration * arguments which may be used by this extension. * * @throws ArgumentException If a problem is encountered while updating the * provided argument parser. */ @Override public void defineConfigArguments(final ArgumentParser parser) throws ArgumentException { // No arguments will be allowed by default. } /** * This hook is called when a Sync Pipe first starts up, when the * <i>resync</i> process first starts up, or when the set-startpoint * subcommand is called from the <i>realtime-sync</i> command line tool. * Any initialization of this sync source should be performed here. This * method should generally store the {@link SyncServerContext} in a class * member so that it can be used elsewhere in the implementation. * * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param serverContext A handle to the server context for the server in * which this extension is running. * @param config The general configuration for this sync source. * @param parser The argument parser which has been initialized from * the configuration for this JDBC sync source. */ @Override public void initializeJDBCSyncSource(final TransactionContext ctx, final SyncServerContext serverContext, final JDBCSyncSourceConfig config, final ArgumentParser parser) { this.serverContext = serverContext; this.monitorProvider = serverContext.registerMonitorProvider( new SimpleMonitorProvider(), config); } /** * This hook is called when a Sync Pipe shuts down, when the <i>resync</i> * process shuts down, or when the set-startpoint subcommand (from the * <i>realtime-sync</i> command line tool) is finished. Any clean up of this * sync source should be performed here. * * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. */ @Override public void finalizeJDBCSyncSource(final TransactionContext ctx) { serverContext.deregisterMonitorProvider(monitorProvider); } /** * This method should effectively set the starting point for synchronization * to the place specified by the <code>options</code> parameter. This should * cause all changes previous to the specified start point to be disregarded * and only changes after that point to be returned by * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)}. * <p> * There are several different startpoint types (see * {@link SetStartpointOptions}), and this implementation is not required to * support them all. If the specified startpoint type is unsupported, this * method should throw an {@link IllegalArgumentException}. * <p> * <b>IMPORTANT</b>: The <code>RESUME_AT_SERIALIZABLE</code> startpoint type * must be supported by your implementation, because this is used when a Sync * Pipe first starts up. * <p> * This method can be called from two different contexts: * <ul> * <li>When the 'set-startpoint' subcommand of the realtime-sync CLI is used * (the Sync Pipe is required to be stopped in this context)</li> * <li>Immediately after a connection is first established to the source * server (e.g. before the first call to * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)})</li> * </ul> * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param options * an object which indicates where exactly to start synchronizing * (e.g. the end of the changelog, specific change number, a certain * time ago, etc) * @throws SQLException * if there is any error while setting the start point */ @Override public void setStartpoint(final TransactionContext ctx, final SetStartpointOptions options) throws SQLException { switch(options.getStartpointType()) { case BEGINNING_OF_CHANGELOG: lastCompletedChangeNumber = 0; nextChangeNumberToRetrieve = 0; break; case END_OF_CHANGELOG: PreparedStatement stmt = ctx.prepareStatement( "SELECT MAX(change_number) AS value FROM " + CHANGELOG_TABLE); ResultSet rset = stmt.executeQuery(); try { long value = 0; if(rset.next()) { value = rset.getLong("value"); } else { String msg = "Could not find max change number"; serverContext.logMessage(LogSeverity.SEVERE_ERROR, msg); throw new SQLException(msg); } lastCompletedChangeNumber = value; nextChangeNumberToRetrieve = value + 1; } finally { rset.close(); stmt.close(); } break; case RESUME_AT_CHANGE_NUMBER: nextChangeNumberToRetrieve = options.getChangeNumber(); lastCompletedChangeNumber = nextChangeNumberToRetrieve - 1; break; case RESUME_AT_SERIALIZABLE: //When sync first starts up, this method is //called with this StartpointType to //initialize the internal state. Serializable token = options.getSerializableValue(); if(token != null) { lastCompletedChangeNumber = (Long) token; nextChangeNumberToRetrieve = lastCompletedChangeNumber + 1; } break; default: throw new IllegalArgumentException( "This startpoint type is not supported: " + options.getStartpointType().toString()); } } /** * Gets the current value of the startpoint for change detection. This is the * "bookmark" which indicates which changes have already been processed and * which have not. In most cases, a change number is used to detect changes * and is managed by the Data Sync Server, in which case this * implementation needs only to return the latest acknowledged * change number. In other cases, the return value may correspond to a * different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. * In any case, this method should return the value that is updated by * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}. * <p> * This method is called periodically and the return value is saved in the * persistent state for the Sync Pipe that uses this extension as its Sync * Source. * <p> * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. * @return a value to store in the persistent state for the Sync Pipe. This is * usually a change number, but if a changelog table is not used to * detect changes, this value should represent some other token to * pass into * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} * when the sync pipe starts up. */ @Override public Serializable getStartpoint() { return Long.valueOf(lastCompletedChangeNumber); } /** * Return a full source entry (in LDAP form) from the database, corresponding * to the {@link DatabaseChangeRecord} that is passed in through the * {@link SyncOperation}. This method should perform any queries necessary to * gather the latest values for all the attributes to be synchronized. * <p> * This method <b>must be thread safe</b>, as it will be called repeatedly and * concurrently by each of the Sync Pipe worker threads as they process * entries. * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param operation * the SyncOperation which identifies the database "entry" to * fetch. The DatabaseChangeRecord can be obtained by calling * <code>operation.getDatabaseChangeRecord()</code>. * This is what is returned by * {@link #getNextBatchOfChanges(TransactionContext, int, AtomicLong)} * and also what comes out of * {@link #listAllEntries(TransactionContext, String, BlockingQueue)}. * @return a full LDAP Entry, or null if no such entry exists. * @throws SQLException * if there is an error fetching the entry */ @Override public Entry fetchEntry(final TransactionContext ctx, final SyncOperation operation) throws SQLException { //Create a map of all the identity key/value pairs (delimited by '%%') that //make up the identifiable info. In this single-table example, the //identifier is always "uid={uid}". This comes from the 'identifier' column //in the changelog table. It is up to the trigger code for each table to //construct this. DatabaseChangeRecord changeRecord = operation.getDatabaseChangeRecord(); DN id = changeRecord.getIdentifiableInfo(); Map<String,String> keyAndValue = ScriptUtils.dnToMap(id); if (! keyAndValue.containsKey("uid")) { throw new SQLException("No value found for uid in " + changeRecord); } Entry entry; String entryType = changeRecord.getEntryType(); if(entryType.equalsIgnoreCase("person")) { long uid = Long.valueOf(keyAndValue.get("uid")); String sql = "SELECT * FROM " + DATA_TABLE + " WHERE uid = ?"; PreparedStatement stmt = ctx.prepareStatement(sql); try { stmt.setLong(1, uid); //Create an entry using the literal column names as attribute names entry = ctx.searchToRawEntry(stmt, "uid"); } finally { stmt.close(); } } else { throw new IllegalArgumentException("Unknown entry type: " + entryType); } return entry; } /** * Provides a way for the Data Sync Server to acknowledge back to the * extension which sync operations it has processed. This method should update * the official startpoint which was set by * {@link #setStartpoint(TransactionContext, SetStartpointOptions)} and is * returned by {@link #getStartpoint()}. * <p> * <b>IMPORTANT</b>: The internal value for the startpoint should only be * updated after a sync operation is acknowledged back to this extension (via * this method). Otherwise it will be possible for changes to be missed when * the Data Sync Server is restarted or a connection error occurs. * <p> * A {@link TransactionContext} is provided in case the acknowledgment needs * to make it all the way back to the database itself (for example if you were * using Oracle's Change Data Capture). * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param completedOps * a list of {@link SyncOperation}s that have finished processing. * The records are listed in the order they were first detected. * @throws SQLException * if there is an error acknowledging the changes back to the * database */ @Override public void acknowledgeCompletedOps( final TransactionContext ctx, final LinkedList<SyncOperation> completedOps) throws SQLException { if(!completedOps.isEmpty()) { //Update lastCompletedChangeNumber to that of the last completed operation DatabaseChangeRecord last = completedOps.getLast().getDatabaseChangeRecord(); lastCompletedChangeNumber = last.getChangeNumber(); } } /** * Return the next batch of change records from the database. Change records * are just hints that a change happened; they do not include the actual data * of the change. In an effort to never synchronize stale data, the * Data Sync Server will go back and fetch the full source entry for * each change record. * <p> * On the first invocation, this should return changes starting from the * startpoint that was set by * {@link #setStartpoint(TransactionContext, SetStartpointOptions)}. This * method is responsible for updating the internal state such that subsequent * invocations do not return duplicate changes. * <p> * The resulting list should be limited by <code>maxChanges</code>. The * <code>numStillPending</code> reference should be set to the estimated * number of changes that haven't yet been retrieved from the changelog table * when this method returns, or zero if all the current changes have been * retrieved. * <p> * <b>IMPORTANT</b>: While this method needs to keep track of which changes * have already been returned so that it does not return them again, it should * <b>NOT</b> modify the official startpoint. The internal value for the * startpoint should only be updated after a sync operation is acknowledged * back to this extension (via * {@link #acknowledgeCompletedOps(TransactionContext, LinkedList)}). * Otherwise it will be possible for changes to be missed when the * Data Sync Server is restarted or a connection error occurs. The * startpoint should not change as a result of this method. * <p> * This method <b>does not need to be thread-safe</b>. It will be invoked * repeatedly by a single thread, based on the polling interval set in the * Sync Pipe configuration. * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param maxChanges * the maximum number of changes to retrieve * @param numStillPending * this should be set to the number of unretrieved changes that * are still pending after this batch has been retrieved. This will * be passed in as zero, and may be left that way if the actual value * cannot be determined. * @return a list of {@link DatabaseChangeRecord} instances, each * corresponding to a row in the changelog table (or the equivalent if * some other change tracking mechanism is being used). If there are * no new changes to return, this method should return an empty list. * @throws SQLException * if there is any error while retrieving the next batch of changes */ @Override public List<DatabaseChangeRecord> getNextBatchOfChanges( final TransactionContext ctx, final int maxChanges, final AtomicLong numStillPending) throws SQLException { List<DatabaseChangeRecord> results = new ArrayList<DatabaseChangeRecord>(); PreparedStatement stmt = ctx.prepareStatement( "SELECT * FROM " + CHANGELOG_TABLE + " WHERE change_number >= ? ORDER BY change_number ASC"); //This is a generic way to limit the size of the result set; however, this //may or may not be implemented by the JDBC driver to alter the query //itself. It may just allow the full set of results to come back and then //perform the truncation within the JVM, which can cause out of memory //errors. Most databases have a much more efficient mechansim (for example //"SELECT TOP(1000)..." in SQLServer and "SELECT ... WHERE ROWNUM < 1000" //in Oracle). stmt.setMaxRows(maxChanges); stmt.setLong(1, nextChangeNumberToRetrieve); ResultSet rset = stmt.executeQuery(); while(rset.next()) { if(results.size() >= maxChanges) { serverContext.debugError( "The result set contained too many rows; expected no more than " + maxChanges); break; } //In this case there is a change_type column in the changelog table which //gives us the change type ChangeType type = ChangeType.valueOf(rset.getString("change_type")); DatabaseChangeRecord.Builder bldr = new DatabaseChangeRecord.Builder( type, rset.getString("identifier")); long changeNum = rset.getLong("change_number"); //Update nextChangeNumberToRetrieve so that the next call will get the //next batch nextChangeNumberToRetrieve = changeNum + 1; bldr.changeNumber(changeNum); bldr.tableName(rset.getString("table_name")); String entryType = rset.getString("entry_type"); if(!entryType.equalsIgnoreCase("person")) { //This sync source only handles the "person" entry type serverContext.debugInfo("Skipping change with entry type: " + entryType); continue; } bldr.entryType(entryType); //Get the list of changed columns for this change. For UPDATE operations, //the Data Sync Server will only modify the destination //attributes that depend on the originally changed source columns (i.e. //if this is not set, no modifications will take place to the destination //entry). String cols = rset.getString("changed_columns"); bldr.changedColumns(cols != null ? cols.split(",") : null); //Get the database user who made the change bldr.modifier(rset.getString("modifiers_name")); //Get the timestamp of the change bldr.changeTime(rset.getTimestamp("change_time").getTime()); results.add(bldr.build()); } rset.close(); stmt.close(); //Figure out how many changes are still unretrieved at this point stmt = ctx.prepareStatement("SELECT COUNT(*) FROM " + CHANGELOG_TABLE + " WHERE change_number >= ?"); stmt.setLong(1, nextChangeNumberToRetrieve); rset = stmt.executeQuery(); if(rset.next()) { long stillPending = rset.getLong(1); numStillPending.set(stillPending); } rset.close(); stmt.close(); return results; } /** * Gets a list of all the entries in the database for a given entry type. This * is used by the 'resync' command line tool. The default implementation * throws a {@link UnsupportedOperationException}; subclasses should override * if the resync functionality is needed. * <p> * The <code>entryType</code> is user-defined; it will be * passed in on the command line for resync. The <code>outputQueue</code> * should contain {@link DatabaseChangeRecord} objects with the * <code>ChangeType</code> set to <i>resync</i>. * <p> * This method should not return until all the entries of the given entryType * have been added to the output queue. Separate threads will concurrently * drain entries from the queue and process them. (The queue should not * actually contain full entries, but rather DatabaseChangeRecord objects * which identify the full database entries. These objects are then * individually passed in to * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, * it is important to make sure that the DatabaseChangeRecord instances * contain enough identifiable information (e.g. primary keys) for each entry * so that the entry can be found again. * <p> * The lifecycle of resync is similar to that of real-time sync, with a few * differences: * <ol> * <li>Stream out a list of all IDs in the database (for a given entryType) * </li> * <li>Fetch full source entry for an ID</li> * <li>Perform any mappings and compute the equivalent destination entry</li> * <li>Fetch full destination entry</li> * <li>Diff the computed destination entry and actual destination entry</li> * <li>Apply the minimal set of changes at the destination to bring it in sync * </li> * </ol> * If the total set of entries is very large, it is fine to split up the work * into multiple database queries within this method. The queue will not grow * out of control because it blocks when it becomes full. The queue capacity * is fixed at 1000. * <p> * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param entryType * the type of database entry to be fetched (this is specified * on the CLI for the resync command) * @param outputQueue * a queue of DatabaseChangeRecord objects which will be individually * fetched via * {@link #fetchEntry(TransactionContext, SyncOperation)} * @throws SQLException * if there is an error retrieving the list of entries to resync */ @Override public void listAllEntries( final TransactionContext ctx, final String entryType, final BlockingQueue<DatabaseChangeRecord> outputQueue) throws SQLException { serverContext.debugInfo("Beginning to dump all entries..."); if(entryType.equalsIgnoreCase("person")) { //Get a full list of the UIDs PreparedStatement stmt = ctx.prepareStatement( "SELECT uid FROM " + DATA_TABLE + " ORDER BY uid ASC"); ResultSet rset = stmt.executeQuery(); while(rset.next()) { long uid = rset.getLong("uid"); DatabaseChangeRecord.Builder bldr = new DatabaseChangeRecord.Builder(ChangeType.resync, "uid=" + uid); //set the entry type so that fetchEntry() can use it bldr.entryType(entryType); try { outputQueue.put(bldr.build()); } catch(InterruptedException e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); rset.close(); stmt.close(); throw new SQLException("The outputQueue was interrupted.", e); } } rset.close(); stmt.close(); } else { throw new IllegalArgumentException("Unknown entry type: " + entryType); } } /** * Gets a list of all the entries in the database from a given file input. * This is used by the 'resync' command line tool. The default implementation * throws a {@link UnsupportedOperationException}; subclasses should override * if the resync functionality is needed for specific database records, which * can be specified in the input file. * <p> * The format for the <code>inputLines</code> (e.g. the content of the file) * is user-defined; it may be key/value pairs, primary keys, or full SQL * statements, for example. The use of this method is triggered via the * <i>--sourceInputFile</i> argument on the resync CLI. The * <code>outputQueue</code> should contain {@link DatabaseChangeRecord} * objects with the <code>ChangeType</code> set to <i>resync</i>. * <p> * This method should not return until all the entries specified by the input * file have been added to the output queue. Separate threads will * concurrently drain entries from the queue and process them. (The queue * should not actually contain full entries, but rather DatabaseChangeRecord * objects which identify the full database entries. These objects are then * individually passed in to * {@link #fetchEntry(TransactionContext, SyncOperation)}. Therefore, * it is important to make sure that the DatabaseChangeRecord instances * contain enough identifiable information (e.g. primary keys) for each entry * so that the entry can be found again. * <p> * The lifecycle of resync is similar to that of real-time sync, with a few * differences: * <ol> * <li>Stream out a list of all IDs in the database (using the given input * file)</li> * <li>Fetch full source entry for an ID</li> * <li>Perform any mappings and compute the equivalent destination entry</li> * <li>Fetch full destination entry</li> * <li>Diff the computed destination entry and actual destination entry</li> * <li>Apply the minimal set of changes at the destination to bring it in sync * </li> * </ol> * If the total set of entries is very large, it is fine to split up the work * into multiple database queries within this method. The queue will not grow * out of control because it blocks when it becomes full. The queue capacity * is fixed at 1000. * <p> * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param entryType * the type of database entry to be fetched (this is specified * on the CLI for the resync command) * @param inputLines * an Iterator containing the lines from the specified input file to * resync (this is specified on the CLI for the resync command). * These lines can be any format, for example a set of primary keys, * a set of WHERE clauses, a set of full SQL queries, etc. * @param outputQueue * a queue of DatabaseChangeRecord objects which will be individually * fetched via {@link #fetchEntry(TransactionContext, SyncOperation)} * @throws SQLException * if there is an error retrieving the list of entries to resync */ @Override public void listAllEntries( final TransactionContext ctx, final String entryType, final Iterator<String> inputLines, final BlockingQueue<DatabaseChangeRecord> outputQueue) throws SQLException { if(!entryType.equalsIgnoreCase("person")) { throw new IllegalArgumentException("Unknown entry type: " + entryType); } while(inputLines.hasNext()) { String uid = inputLines.next().trim(); if(uid.isEmpty()) { continue; } DatabaseChangeRecord.Builder bldr = new DatabaseChangeRecord.Builder( ChangeType.resync, "uid=" + uid); //Set the entry type so that fetchEntry() can use it bldr.entryType(entryType); try { outputQueue.put(bldr.build()); } catch(InterruptedException e) { serverContext.logMessage(LogSeverity.SEVERE_ERROR, StaticUtils.getExceptionMessage(e)); throw new SQLException("The outputQueue was interrupted.", e); } } } /** * Performs a cleanup of the changelog table (if desired). There is a * background thread that periodically invokes this method. It should remove * any rows in the changelog table that are more than * <code>maxAgeMillis</code> milliseconds old. * <p> * <b>NOTE:</b> If the system clock on the database server is not in sync with * the system clock on the Data Sync Server, this method should query * the database for its current time in order to determine the cut-off point * for deleting changelog records. * <p> * If a separate mechanism will be used to manage the changelog table, this * method may be implemented as a no-op and always return zero. This is how * the default implementation behaves. * @param ctx * a TransactionContext which provides a valid JDBC connection to the * database. * @param maxAgeMillis * the period of time (in milliseconds) after which a changelog table * record should be deleted * @return the number of rows that were deleted from the changelog table * @throws SQLException * if there is an error purging records from the changelog table */ @Override public int cleanupChangelog(final TransactionContext ctx, final long maxAgeMillis) throws SQLException { //get current time on database PreparedStatement stmt = ctx.prepareStatement("SELECT CURRENT_TIMESTAMP"); ResultSet rset = stmt.executeQuery(); long currentTimeMillis; try { if(rset.next()) { currentTimeMillis = rset.getTimestamp(1).getTime(); } else { throw new SQLException( "Cannot determine current timestamp on database."); } } finally { rset.close(); stmt.close(); } stmt = ctx.prepareStatement( "DELETE FROM " + CHANGELOG_TABLE + " WHERE change_time < ?"); stmt.clearWarnings(); stmt.setTimestamp(1, new Timestamp(currentTimeMillis - maxAgeMillis)); int rowCount = stmt.executeUpdate(); stmt.close(); return rowCount; } /** * A simple implementation of MonitorProvider that we can register with the * server through the ServerContext object. This provides the current values * of nextChangeNumberToRetrieve and lastCompletedChangeNumber. */ private class SimpleMonitorProvider extends MonitorProvider { /** * Retrieves a human-readable name for this extension. This is not used * since we are registering this MonitorProvider through the ServerContext * object (e.g. it is not a stand-alone extension). * * @return A human-readable name for this extension. */ @Override public String getExtensionName() { return "SimpleMonitorProvider"; } /** * Retrieves a human-readable description for this extension. Each element * of the array that is returned will be considered a separate paragraph in * generated documentation. This is not used since we are registering this * MonitorProvider through the ServerContext object (e.g. it is not a * stand-alone extension). * * @return A human-readable description for this extension, or {@code null} * or an empty array if no description should be available. */ @Override public String[] getExtensionDescription() { return null; } /** * Retrieves the name that identifies this monitor provider instance. It * will be used as the value of the naming attribute for monitor entries. * Each monitor provider instance must have a unique name. * * @return The name that identifies this monitor provider instance. */ @Override public String getMonitorInstanceName() { return "ExampleJDBCSyncSource Monitor"; } /** * Retrieves the name of the object class that will be used for monitor * entries created from this monitor provider. This does not need to be * defined in the server schema. It may be {@code null} if a default object * class should be used. * * @return The name of the object class that will be used for monitor * entries created from this monitor provider. */ @Override public String getMonitorObjectClass() { return "jdbc-sync-source-monitor-entry"; } /** * Retrieves a list of attributes containing the data to include in the * monitor entry generated from this monitor provider. * * @return A list of attributes containing the data to include in the * monitor entry generated from this monitor provider. */ @Override public List<Attribute> getMonitorAttributes() { List<Attribute> attributes = new ArrayList<Attribute>(2); attributes.add(new Attribute("next-changenumber-to-retrieve", String.valueOf(nextChangeNumberToRetrieve))); attributes.add(new Attribute("last-completed-change-number", String.valueOf(lastCompletedChangeNumber))); return attributes; } } }