RADAR-base/MongoDb-Sink-Connector

Name: MongoDb-Sink-Connector

Owner: RADAR-CNS

Owner: RADAR-CNS

Description: Kafka MongoDb sink connector

Created: 2016-11-28 09:57:40.0

Updated: 2018-01-26 07:52:10.0

Pushed: 2018-01-15 10:51:10.0

Homepage:

Size: 355

Language: Java

GitHub Committers

UserMost Recent Commit# Commits

Other Committers

UserEmailMost Recent Commit# Commits

README

Kafka MongoDb Sink Connector

Build Status Codacy Badge

The MongoDB-Sink-Connector is a Kafka-Connector for scalable and reliable data streaming from a Kafka topic or number of Kafka topics to a MongoDB collection or number of MongoDB collections. It consumes Avro data from Kafka topics, converts them into Documents and inserts them into MongoDB collections.

Currently, it supports records that have an Avro schema.

Installation

This connector can be used inside a Docker stack or installed as a general Kafka Connect plugin.

Docker installation

Use the radarcns/kafka-connect-mongodb-sink Docker image to connect it inside a Docker infrastructure. For example, RADAR-Docker uses a Docker Compose file. The Kafka Connect Docker image requires environment to be set up. In RADAR-Docker, the following environment variables are set:

ronment:
NNECT_BOOTSTRAP_SERVERS: PLAINTEXT://kafka-1:9092,PLAINTEXT://kafka-2:9092,PLAINTEXT://kafka-3:9092
NNECT_REST_PORT: 8083
NNECT_GROUP_ID: "mongodb-sink"
NNECT_CONFIG_STORAGE_TOPIC: "mongodb-sink.config"
NNECT_OFFSET_STORAGE_TOPIC: "mongodb-sink.offsets"
NNECT_STATUS_STORAGE_TOPIC: "mongodb-sink.status"
NNECT_KEY_CONVERTER: "io.confluent.connect.avro.AvroConverter"
NNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
NNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry-1:8081"
NNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schema-registry-1:8081"
NNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
NNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
NNECT_OFFSET_STORAGE_FILE_FILENAME: "/tmp/mongdb-sink.offset"
NNECT_REST_ADVERTISED_HOST_NAME: "radar-mongodb-connector"
NNECT_ZOOKEEPER_CONNECT: zookeeper-1:2181
NNECT_LOG4J_LOGGERS: "org.reflections=ERROR"
FKA_BROKERS: 3

Before starting the streams, the Docker image waits for KAFKA_BROKERS number of brokers to be available as well as the schema registry.

System installation

This connector requires the following setup:

To install the connector, follow the next steps:

Usage

Modify sink.properties file according your environment. The following properties are supported:

Name Description Type Default Valid Values Importance
mongo.databaseMongoDB database namestringhigh
mongo.hostMongoDB host name to write data tostringhigh
topicsList of topics to be streamed.listhigh
collection.formatA format string for the destination collection name, which may contain `${topic}`as a placeholder for the originating topic name. For example, `kafka_${topic}` for the topic `orders` will map to the collection name `kafka_orders`.string{$topic}medium
mongo.passwordPassword to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
mongo.usernameUsername to connect to MongoDB database. If not set, no credentials are used.stringnullmedium
record.converter.classRecordConverterFactory that returns classes to convert Kafka SinkRecords to BSON documents.classclass org.radarcns.connect.mongodb.serialization.RecordConverterFactorymedium
buffer.capacityMaximum number of items in a MongoDB writer buffer. Once the buffer becomes full,the task fails.int20000[1,...]low
mongo.portMongoDB portint27017[1,...]low
Developer guide

This MongoDB-Sink-Connector works based on RecordConverters to convert a SinkRecord to a Document. The default RecordConverter is GenericRecordConverter, which converts a record-key as _id and adds a field for every field-name from record-value. The GenericRecordConverter supports conversion of most of the primitive types and collections.

For Avro records with complex schemas, or for custom collection format it is recommended to write your own RecordConverter and register it to an extended RecordConverterFactory. Writing a custom RecordConverter is relatively straight forward. The interface requires two methods to be implemented.


onverts Kafka records to MongoDB documents.

ic interface RecordConverter {
/**
 * <p>The schema names used are the fully qualified (including namespace) and case-sensitive
 * names. If the converter requires records with both a key and a value schema, the  returned
 * format is "KeySchemaName-ValueSchemaName". If the key is not required, only "ValueSchemaName"
 * may be returned. KeySchemaName and ValueSchemaName may be substituted by the Object class
 * that it supports. If the converter supports all types of data, return null.
 */
Collection<String> supportedSchemaNames();

/**
 * Convert a Kafka record to a BSON document.
 *
 * @param record record to convert
 * @return BSON document
 * @throws DataException if the record cannot be converted by the current converter.
 */
Document convert(SinkRecord record) throws DataException;

Sample RecordConverter Implementation
  1. Implement a custom RecordConverter. An example is given below. Consider a record consisting key-schema

    
    amespace": "org.radarcns.key",
    ype": "record",
    ame": "MeasurementKey",
    oc": "Measurement key in the RADAR-CNS project",
    ields": [
    {"name": "userId", "type": "string", "doc": "user ID"},
    {"name": "sourceId", "type": "string", "doc": "device source ID"}
    
    
    

    and a value-schema as below.

    
    amespace": "org.radarcns.application",
    ype": "record",
    ame": "ApplicationRecordCounts",
    oc": "Number of records cached or created.",
    ields": [
    {"name": "time", "type": "double", "doc": "device timestamp in UTC (s)"},
    {"name": "timeReceived", "type": "double", "doc": "device receiver timestamp in UTC (s)"},
    {"name": "recordsCached", "type": "int", "doc": "number of records currently being cached", "default": -1},
    {"name": "recordsSent", "type": "int", "doc": "number of records sent since application start"},
    {"name": "recordsUnsent", "type": "int", "doc": "number of unsent records", "default": -1}
    
    
    

    These samples would give us the KeySchemaName as org.radarcns.key.MeasurementKey and ValueSchemaName as org.radarcns.application.ApplicationRecordCounts. Lets call our custom RecordConverter as CountsStatusRecordConverter. The implementation can be as simple as below.

    
    ecordConverter to convert a StatusCounts record to a MongoDB Document.
    
    ic class CountsStatusRecordConverter implements RecordConverter {
    
    /**
     * Returns the list of supported schemas, which behaves as the id to select suitable
     * RecordConverter for a SinkRecord.
     *
     * @return a list of supported Schemas
     */
    @Override
    public Collection<String> supportedSchemaNames() {
        return Collections.singleton("org.radarcns.key.MeasurementKey" + "-"
                + "org.radarcns.application.ApplicationRecordCounts");
    }
    
    /**
     * Converts a ServerStatus SinkRecord into a MongoDB Document.
     *
     * @param sinkRecord record to be converted
     * @return converted MongoDB Document to write
     */
    @Override
    public Document convert(SinkRecord sinkRecord) throws DataException {
    
        Struct key = (Struct) sinkRecord.key();
        Struct value = (Struct) sinkRecord.value();
    
        return new Document("_id", key.get("userId") + "-" + key.get("sourceId"))
                .append("user", key.getString("userId"))
                .append("source", key.getString("sourceId"))
                .append("recordsCached", value.getInt32("recordsCached"))
                .append("recordsSent", value.getInt32("recordsSent"))
                .append("timestamp", Converter.toDateTime(value.get("timeReceived")));
    }
    
    
  2. Register implemented RecordConverter to an extended RecordConverterFactory.

    age org.radarcns.connect.mongodb.example;
    
    xtended RecordConverterFactory to allow customized RecordConverter class that are needed
    
    ic class RecordConverterFactoryExample extends RecordConverterFactory {
    
    /**
     * Overrides genericConverter to append custom RecordConverter class to RecordConverterFactory
     *
     * @return list of RecordConverters available
     */
    protected List<RecordConverter> genericConverters() {
        List<RecordConverter> recordConverters = new ArrayList<RecordConverter>();
        recordConverters.addAll(super.genericConverters());
        recordConverters.add(new CountsStatusRecordConverter());
        return recordConverters;
    }
    
    
    
  3. Use extended RecordConverterFactoryExample in sink.properties

    ctory class to do the actual record conversion
    rd.converter.class=org.radarcns.connect.mongodb.example.RecordConverterFactoryExample
    
Notes

The only available setting is the number of records returned in a single call to poll() (i.e. consumer.max.poll.records param inside standalone.properties)

Connectors can be run inside any machine where Kafka has been installed. Therefore, you can fire them also inside a machine that does not host a Kafka broker.

To reset a connector running in standalone mode you have to stop it and then modify name and offset.storage.file.filename respectively inside sink.properties and standalone.properties

Contributing

All of the contribution code should be formatted using the Google Java Code Style Guide. If you want to contribute a feature or fix browse our issues, and please make a pull request.


This work is supported by the National Institutes of Health's National Center for Advancing Translational Sciences, Grant Number U24TR002306. This work is solely the responsibility of the creators and does not necessarily represent the official views of the National Institutes of Health.