Class ChangeDetector

  • All Implemented Interfaces:
    Configurable, ExampleUsageProvider, Reconfigurable<ChangeDetectorConfig>, UnboundIDExtension

    @Extensible
    @SynchronizationServerExtension(appliesToLocalContent=false,
                                    appliesToSynchronizedContent=true)
    public abstract class ChangeDetector
    extends java.lang.Object
    implements UnboundIDExtension, Reconfigurable<ChangeDetectorConfig>, ExampleUsageProvider
    This class defines an API that must be implemented by extensions that detect changes for an LDAP based Sync Source. A Change Detector can be used to
    • Process logs or other flat files for changes.
    • Process changes from a Queue (Kafka, RabbitMQ, etc)
    • Override the standard cn=changelog based approach for detecting changes.

    Configuring Change Detectors

    In order to configure a Change Detector created using this API, use a command like:
        dsconfig create-change-detector \
              --detector-name "{detector-name}" \
              --type third-party \
              --set "extension-class:{class-name}" \
              --set "extension-argument:{name=vale}"
     
    where "{plugin-name}" is the name to use for the Change Detector instance, "{class-name}" is the fully-qualified name of the Java class that extends com.unboundid.directory.sdk.sync.api.ChangeDetector, and "{name=value}" represents name-value pairs for any arguments to provide to the Change Detector. If multiple arguments should be provided to the Change Detector, then the "--set extension-argument:{name=value}" option should be provided multiple times.
    • Constructor Summary

      Constructors 
      Constructor Description
      ChangeDetector()
      Creates a new instance of this LDAP Change Detector.
    • Method Summary

      All Methods Instance Methods Abstract Methods Concrete Methods 
      Modifier and Type Method Description
      abstract void acknowledgeCompletedOps​(java.util.LinkedList<SyncOperation> completedOps)
      Provides a way for the Data Sync Server to acknowledge back to the extension which sync operations it has processed.
      ResultCode applyConfiguration​(ChangeDetectorConfig config, ArgumentParser parser, java.util.List<java.lang.String> adminActionsRequired, java.util.List<java.lang.String> messages)
      Attempts to apply the configuration from the provided argument parser to this extension.
      void defineConfigArguments​(ArgumentParser parser)
      Updates the provided argument parser to define any configuration arguments which may be used by this extension.
      void finalizeChangeDetector()
      This hook is called when a Sync Pipe shuts down or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished.
      java.util.Map<java.util.List<java.lang.String>,​java.lang.String> getExamplesArgumentSets()
      Retrieves a map containing examples of configurations that may be used for this extension.
      abstract java.lang.String[] getExtensionDescription()
      Retrieves a human-readable description for this extension.
      abstract java.lang.String getExtensionName()
      Retrieves a human-readable name for this extension.
      abstract java.util.List<ChangeRecord> getNextBatchOfChanges​(int maxChanges, java.util.concurrent.atomic.AtomicLong numStillPending)
      Return the next batch of change records from the source.
      abstract java.io.Serializable getStartpoint()
      Gets the current value of the startpoint for change detection.
      void initializeChangeDetector​(SyncServerContext serverContext, SyncSourceContext syncSourceContext, ArgumentParser parser)
      This hook is called when a Sync Pipe first starts up or when the set-startpoint subcommand is called from the realtime-sync command line tool.
      boolean isConfigurationAcceptable​(ChangeDetectorConfig config, ArgumentParser parser, java.util.List<java.lang.String> unacceptableReasons)
      Indicates whether the configuration represented by the provided argument parser is acceptable for use by this extension.
      abstract void setStartpoint​(SetStartpointOptions options)
      This method should effectively set the starting point for synchronization to the place specified by the options parameter.
      • Methods inherited from class java.lang.Object

        clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait
    • Constructor Detail

      • ChangeDetector

        public ChangeDetector()
        Creates a new instance of this LDAP Change Detector. All Change Detector implementations must include a default constructor, but any initialization should generally be done in the initializeChangeDetector method.
    • Method Detail

      • getExtensionDescription

        public abstract java.lang.String[] getExtensionDescription()
        Retrieves a human-readable description for this extension. Each element of the array that is returned will be considered a separate paragraph in generated documentation.
        Specified by:
        getExtensionDescription in interface UnboundIDExtension
        Returns:
        A human-readable description for this extension, or null or an empty array if no description should be available.
      • getExamplesArgumentSets

        public java.util.Map<java.util.List<java.lang.String>,​java.lang.String> getExamplesArgumentSets()
        Retrieves a map containing examples of configurations that may be used for this extension. The map key should be a list of sample arguments, and the corresponding value should be a description of the behavior that will be exhibited by the extension when used with that configuration.
        Specified by:
        getExamplesArgumentSets in interface ExampleUsageProvider
        Returns:
        A map containing examples of configurations that may be used for this extension. It may be null or empty if there should not be any example argument sets.
      • defineConfigArguments

        public void defineConfigArguments​(ArgumentParser parser)
                                   throws ArgumentException
        Updates the provided argument parser to define any configuration arguments which may be used by this extension. The argument parser may also be updated to define relationships between arguments (e.g., to specify required, exclusive, or dependent argument sets).
        Specified by:
        defineConfigArguments in interface Configurable
        Parameters:
        parser - The argument parser to be updated with the configuration arguments which may be used by this extension.
        Throws:
        ArgumentException - If a problem is encountered while updating the provided argument parser.
      • initializeChangeDetector

        public void initializeChangeDetector​(SyncServerContext serverContext,
                                             SyncSourceContext syncSourceContext,
                                             ArgumentParser parser)
        This hook is called when a Sync Pipe first starts up or when the set-startpoint subcommand is called from the realtime-sync command line tool. Any initialization of this change detector should be performed here. This method should generally store the SyncServerContext and SyncSourceContext in a class member so that it can be used elsewhere in the implementation.

        The default implementation is empty.

        Parameters:
        serverContext - A handle to the server context for the server in which this extension is running.
        syncSourceContext - An interface for interacting with the Sync Source that owns this Change Detector or null if the Change Detector is only being initialized to validate its configuration.
        parser - The argument parser which has been initialized from the configuration for this sync source.
      • finalizeChangeDetector

        public void finalizeChangeDetector()
        This hook is called when a Sync Pipe shuts down or when the set-startpoint subcommand (from the realtime-sync command line tool) is finished. Any clean up of this change detector should be performed here.

        The default implementation is empty.

      • isConfigurationAcceptable

        public boolean isConfigurationAcceptable​(ChangeDetectorConfig config,
                                                 ArgumentParser parser,
                                                 java.util.List<java.lang.String> unacceptableReasons)
        Indicates whether the configuration represented by the provided argument parser is acceptable for use by this extension. The parser will have been used to parse any configuration available for this extension, and any automatic validation will have been performed. This method may be used to perform any more complex validation which cannot be performed automatically by the argument parser.
        Specified by:
        isConfigurationAcceptable in interface Reconfigurable<ChangeDetectorConfig>
        Parameters:
        config - The general configuration for this extension.
        parser - The argument parser that has been used to parse the proposed configuration for this extension.
        unacceptableReasons - A list to which messages may be added to provide additional information about why the provided configuration is not acceptable.
        Returns:
        true if the configuration in the provided argument parser appears to be acceptable, or false if not.
      • applyConfiguration

        public ResultCode applyConfiguration​(ChangeDetectorConfig config,
                                             ArgumentParser parser,
                                             java.util.List<java.lang.String> adminActionsRequired,
                                             java.util.List<java.lang.String> messages)
        Attempts to apply the configuration from the provided argument parser to this extension.
        Specified by:
        applyConfiguration in interface Reconfigurable<ChangeDetectorConfig>
        Parameters:
        config - The general configuration for this extension.
        parser - The argument parser that has been used to parse the new configuration for this extension.
        adminActionsRequired - A list to which messages may be added to provide additional information about any additional administrative actions that may be required to apply some of the configuration changes.
        messages - A list to which messages may be added to provide additional information about the processing performed by this method.
        Returns:
        A result code providing information about the result of applying the configuration change. A result of SUCCESS should be used to indicate that all processing completed successfully. Any other result will indicate that a problem occurred during processing.
      • setStartpoint

        public abstract void setStartpoint​(SetStartpointOptions options)
                                    throws EndpointException
        This method should effectively set the starting point for synchronization to the place specified by the options parameter. This should cause all changes previous to the specified start point to be disregarded and only changes after that point to be returned by getNextBatchOfChanges(int, AtomicLong).

        There are several different startpoint types (see SetStartpointOptions), and this implementation is not required to support them all. If the specified startpoint type is unsupported, this method should throw an UnsupportedOperationException. IMPORTANT: The RESUME_AT_SERIALIZABLE startpoint type must be supported by your implementation, because this is used when a Sync Pipe first starts up. The Serializable in this case is the same type that is returned by getStartpoint(); the Sync Server persists it and passes it back in on a restart.

        This method can be called from two different contexts:

        • When the 'set-startpoint' subcommand of the realtime-sync CLI is used (the Sync Pipe is required to be stopped in this context)
        • Immediately after a Sync Pipe starts up and a connection is first established to the source server (e.g. before the first call to getNextBatchOfChanges(int, AtomicLong))
        Parameters:
        options - an object which indicates where exactly to start synchronizing (e.g. the end of the changelog, specific change number, a certain time ago, etc)
        Throws:
        EndpointException - if there is any error while setting the start point
      • getStartpoint

        public abstract java.io.Serializable getStartpoint()
        Gets the current value of the startpoint for change detection. This is the "bookmark" which indicates which changes have already been processed and which have not. In most cases, a change number is used to detect changes and is managed by the Data Sync Server, in which case this implementation needs only to return the latest acknowledged change number. In other cases, the return value may correspond to a different value, such as the SYS_CHANGE_VERSION in Microsoft SQL Server. In any case, this method should return the value that is updated by acknowledgeCompletedOps(LinkedList).

        This method is called periodically and the return value is saved in the persistent state for the Sync Pipe that uses this extension as its Sync Source. IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this extension (via acknowledgeCompletedOps(LinkedList)). Otherwise it will be possible for changes to be missed when the Data Sync Server is restarted or a connection error occurs.

        Returns:
        a value to store in the persistent state for the Sync Pipe. This is usually a change number, but if a changelog table is not used to detect changes, this value should represent some other token to pass into setStartpoint(SetStartpointOptions) when the sync pipe starts up.
      • getNextBatchOfChanges

        public abstract java.util.List<ChangeRecordgetNextBatchOfChanges​(int maxChanges,
                                                                           java.util.concurrent.atomic.AtomicLong numStillPending)
                                                                    throws EndpointException
        Return the next batch of change records from the source. Change records are usually just hints that a change happened; they do not include the full contents of the target entry. In an effort to never synchronize stale data, the Data Sync Server will go back and fetch the full target entry for each change record.

        On the first invocation, this should return changes starting from the startpoint that was set by setStartpoint(SetStartpointOptions). This method is also responsible for updating the internal state such that subsequent invocations do not return duplicate changes.

        The resulting list should be limited by maxChanges. The numStillPending reference should be set to the estimated number of changes that haven't yet been retrieved from the source endpoint when this method returns, or zero if all the current changes have been retrieved. IMPORTANT: While this method needs to keep track of which changes have already been returned so that it does not return them again, it should NOT modify the official startpoint. The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this extension (via acknowledgeCompletedOps(LinkedList)). Otherwise it will be possible for changes to be missed when the Data Sync Server is restarted or a connection error occurs. The startpoint should not change as a result of this method.

        This method does not need to be thread-safe. It will be invoked repeatedly by a single thread, based on the polling interval set in the Sync Pipe configuration.

        Parameters:
        maxChanges - the maximum number of changes to retrieve
        numStillPending - this should be set to the number of unretrieved changes that are still pending after this batch has been retrieved. This will be passed in as zero, and may be left that way if the actual value cannot be determined.
        Returns:
        a list of ChangeRecord instances, each corresponding to a single change at the source endpoint. If there are no new changes to return, this method should return an empty list.
        Throws:
        EndpointException - if there is any error while retrieving the next batch of changes
      • acknowledgeCompletedOps

        public abstract void acknowledgeCompletedOps​(java.util.LinkedList<SyncOperation> completedOps)
                                              throws EndpointException
        Provides a way for the Data Sync Server to acknowledge back to the extension which sync operations it has processed. This method should update the official startpoint which was set by setStartpoint(SetStartpointOptions) and is returned by getStartpoint(). IMPORTANT: The internal value for the startpoint should only be updated after a sync operation is acknowledged back to this extension (via this method). Otherwise it will be possible for changes to be missed when the Data Sync Server is restarted or a connection error occurs.
        Parameters:
        completedOps - a list of SyncOperations that have finished processing. The records are listed in the order they were first detected.
        Throws:
        EndpointException - if there is an error acknowledging the changes back to the source