/*
* CDDL HEADER START
*
* The contents of this file are subject to the terms of the
* Common Development and Distribution License, Version 1.0 only
* (the "License"). You may not use this file except in compliance
* with the License.
*
* You can obtain a copy of the license at
* docs/licenses/cddl.txt
* or http://www.opensource.org/licenses/cddl1.php.
* See the License for the specific language governing permissions
* and limitations under the License.
*
* When distributing Covered Code, include this CDDL HEADER in each
* file and include the License file at
* docs/licenses/cddl.txt. If applicable,
* add the following below this CDDL HEADER, with the fields enclosed
* by brackets "[]" replaced with your own identifying information:
* Portions Copyright [yyyy] [name of copyright owner]
*
* CDDL HEADER END
*
*
* Copyright 2019-2020 Ping Identity Corporation
*/
package com.unboundid.directory.sdk.examples;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.unboundid.directory.sdk.common.types.LogSeverity;
import com.unboundid.directory.sdk.sync.api.KafkaSyncDestinationPlugin;
import com.unboundid.directory.sdk.sync.config.KafkaSyncDestinationPluginConfig;
import com.unboundid.directory.sdk.sync.types.KafkaMessage;
import com.unboundid.directory.sdk.sync.types.PreStepResult;
import com.unboundid.directory.sdk.sync.types.SyncOperation;
import com.unboundid.directory.sdk.sync.types.SyncServerContext;
import com.unboundid.ldap.sdk.ResultCode;
import com.unboundid.util.args.ArgumentException;
import com.unboundid.util.args.ArgumentParser;
import com.unboundid.util.args.StringArgument;
import java.io.IOException;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* This class provides a simple example of a Kafka sync destination plugin
* which filters out test data and removes passwords from Kafka messages.
* Currently, a single API is provided for manipulating messages prior to
* their being published to a Kafka topic.
* <UL>
* <LI>
* {@link #prePublish(KafkaMessage, SyncOperation)} } -- Allows for the
* transformation or filtering of messages prior to publishing.
* </LI>
* </UL>
*
* It takes the following argument:
* <UL>
* <LI>filter-attribute -- One or more attribute names which will
* be removed from any messages prior to
* publishing.</LI>
* </UL>
*
* Note: This extension is provided as an example of the capabilities
* of a Kafka sync destination plugin. In a production environment, attribute
* modification and filtering would be handled by attribute mapping and
* do not require the use of any plugins.
*
*/
public class ExampleKafkaSyncDestinationPlugin
extends KafkaSyncDestinationPlugin {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final String ARG_NAME_FILTER_ATTRIBUTES =
"filter-attribute";
private SyncServerContext serverContext;
private volatile List<String> filterAttributes;
/**
* Retrieves a human-readable name for this extension.
*
* @return A human-readable name for this extension.
*/
@Override
public String getExtensionName() {
return "Example Kafka Sync Destination Plugin";
}
/**
* Retrieves a human-readable description for this extension. Each element
* of the array that is returned will be considered a separate paragraph in
* generated documentation.
*
* @return A human-readable description for this extension, or {@code null}
* or an empty array if no description should be available.
*/
@Override
public String[] getExtensionDescription() {
return new String[]{
"This is a simple example of a Kafka sync destination plugin " +
"which will filter out any messages intended to be " +
"test data. Additionally, it can be configured to filter " +
"out individual attributes on messages. For example, " +
"passwords or other sensitive attributes."
};
}
/**
* Updates the provided argument parser to define any configuration arguments
* which may be used by this sync pipe plugin. The argument parser may
* also be updated to define relationships between arguments (e.g., to specify
* required, exclusive, or dependent argument sets).
*
* @param parser The argument parser to be updated with the configuration
* arguments which may be used by this sync pipe plugin.
*
* @throws ArgumentException If a problem is encountered while updating the
* provided argument parser.
*/
@Override
public void defineConfigArguments(final ArgumentParser parser)
throws ArgumentException {
// Add an argument that allows for the specification of a list
// of attribute names to filter out of Kafka Messages.
Character shortIdentifier = null;
String longIdentifier = ARG_NAME_FILTER_ATTRIBUTES;
boolean required = false;
int maxOccurences = -1; // multi-valued
String placeHolder = "{attr-name}";
String description = "The name of the attribute (post mapping) whose" +
" value should be filtered out of any messages.";
StringArgument arg = new StringArgument(
shortIdentifier, longIdentifier,
required, maxOccurences,
placeHolder, description
);
parser.addArgument(arg);
}
/**
* Initialize this Kafka sync destination plugin. This method will be called
* before any other methods in this class.
*
* @param serverContext A handle to the server context for the
* Data Sync Server in which this extension is
* running. Extensions should typically store this
* in a class member.
* @param config The general configuration for this plugin.
* @param parser The argument parser which has been initialized from
* the configuration for this Kafka sync destination
* plugin.
*/
@Override
public void initializeKafkaSyncDestinationPlugin(
final SyncServerContext serverContext,
final KafkaSyncDestinationPluginConfig config,
final ArgumentParser parser) {
this.serverContext = serverContext;
this.filterAttributes =
parser.getStringArgument(ARG_NAME_FILTER_ATTRIBUTES).getValues();
}
/**
* Attempts to apply the configuration contained in the provided argument
* parser.
*
* @param config The general configuration for this Kafka sync
* destination plugin.
* @param parser The argument parser which has been
* initialized with the new configuration.
* @param adminActionsRequired A list that can be updated with information
* about any administrative actions that may be
* required before one or more of the
* configuration changes will be applied.
* @param messages A list that can be updated with information
* about the result of applying the new
* configuration.
*
* @return A result code that provides information about the result of
* attempting to apply the configuration change.
*/
@Override
public ResultCode applyConfiguration(
final KafkaSyncDestinationPluginConfig config,
final ArgumentParser parser,
final List<String> adminActionsRequired,
final List<String> messages
) {
this.filterAttributes =
parser.getStringArgument(ARG_NAME_FILTER_ATTRIBUTES).getValues();
return ResultCode.SUCCESS;
}
/**
* Retrieves a map containing examples of configurations that may be used for
* this extension. The map key should be a list of sample arguments, and the
* corresponding value should be a description of the behavior that will be
* exhibited by the extension when used with that configuration.
*
* @return A map containing examples of configurations that may be used for
* this extension. It may be {@code null} or empty if there should
* not be any example argument sets.
*/
@Override
public Map<List<String>, String> getExamplesArgumentSets() {
final LinkedHashMap<List<String>, String> exampleMap =
new LinkedHashMap<>(1);
exampleMap.put(
Arrays.asList(
ARG_NAME_FILTER_ATTRIBUTES + "=userPassword",
ARG_NAME_FILTER_ATTRIBUTES + "=authPassword"
),
"Filter password attributes."
);
return exampleMap;
}
/**
* Provides the ability to alter or suppress Kafka messages.
* This method is called before a message is published to the
* configured Kafka Topic.
*
* @param message The message to be published to the
* Kafka topic.
* @param operation The synchronization operation for this
* change.
* @return The result of the plugin processing.
*/
@Override
public PreStepResult prePublish(final KafkaMessage message,
final SyncOperation operation) {
// Skip any operations which contain test data.
if (message.getKey().contains("test-data")) {
operation.logInfo("Not sending Kafka message for test data.");
return PreStepResult.ABORT_OPERATION;
}
// Retrieve the current message.
String currentMessageValueString = message.getValue();
try {
// Parse it into JSON.
JsonNode currentMessageValueJSON =
MAPPER.readTree(currentMessageValueString);
// Strip out sensitive fields
// Note: The removal of sensitive attributes can also be
// handled by attribute mapping.
for (String attrName : filterAttributes) {
for (String entryField : Arrays.asList("current", "previous")) {
if (currentMessageValueJSON.has(entryField)) {
if (currentMessageValueJSON.get(entryField).has(attrName)) {
operation.logDebug("Removing sensitive attribute " + attrName +
" from the operation.");
((ObjectNode) currentMessageValueJSON.get(entryField))
.remove(attrName);
}
}
}
}
// Save the filtered JSON back into the message.
message.setValue(currentMessageValueJSON.toString());
// Return thumbs up.
return PreStepResult.CONTINUE;
} catch (IOException e) {
serverContext.debugCaught(e);
serverContext.logMessage(LogSeverity.FATAL_ERROR,
"Unable to parse Kafka Message as JSON! Exception: " +
e.getLocalizedMessage());
// Try again a few times in case of a transient issue.
return PreStepResult.RETRY_OPERATION_LIMITED;
}
}
/**
* Appends a string representation of this Kafka sync destination plugin to
* the provided buffer.
*
* @param buffer The buffer to which the string representation should be
* appended.
*/
@Override
public void toString(final StringBuilder buffer) {
buffer.append("ExampleKafkaSyncDestinationPlugin(");
if (filterAttributes.size() > 0) {
for (int i = 0; i < filterAttributes.size(); i++) {
buffer.append(ARG_NAME_FILTER_ATTRIBUTES + "='")
.append(filterAttributes.get(i)).append("'");
if (i != filterAttributes.size() - 1) {
buffer.append(", ");
}
}
}
buffer.append(")");
}
}