Skip to content

Module Structure

terraform-aws-arc-msk

Latest Release Last Updated Terraform GitHub Actions

Quality gate

Known Vulnerabilities

Overview

SourceFuse AWS Reference Architecture (ARC) Terraform module for managing the AWS MSK module.

Features

  • Create an MSK cluster with customizable broker configuration
  • Configure encryption in transit and at rest
  • Set up authentication methods (SASL/SCRAM, IAM, TLS)
  • Enable monitoring with Prometheus JMX and Node exporters
  • Configure logging to CloudWatch, Kinesis Firehose, or S3
  • Create and manage MSK configurations
  • Associate SCRAM secrets for authentication
  • Deploy MSK Connectors to stream data in and out of Kafka
  • Create and manage custom plugins stored in S3
  • Configure MSK Connect worker configuration via properties file
  • Deploy MSK Connect connectors with support for autoscaling or provisioned mode

Introduction

This Terraform module provisions a fully configurable Amazon MSK (Managed Streaming for Apache Kafka) cluster with support for encryption, authentication (IAM, TLS, SASL/SCRAM), monitoring, and logging. It also enables deployment of MSK Connect components including custom plugins, worker configurations, and connectors with autoscaling or provisioned capacity, along with log delivery to CloudWatch, Firehose, or S3—empowering end-to-end Kafka data streaming pipelines.

Usage

To see a full example, check out the main.tf file in the example folder.

Basic Usage

module "msk" {
  source      = "sourcefuse/arc-msk/aws"
  version     = "0.0.1"

  cluster_type           = "provisioned"
  cluster_name           = "example-msk-cluster"
  kafka_version          = "3.6.0"
  number_of_broker_nodes = 2
  broker_instance_type   = "kafka.m5.large"
  client_subnets         = data.aws_subnets.public.ids
  security_groups        = [module.security_group.id]
  broker_storage = {
    volume_size = 150
  }

  client_authentication = {
    sasl_scram_enabled           = true # When set to true, this will create secrets in AWS Secrets Manager.
    allow_unauthenticated_access = false
    sasl_iam_enabled             = true
  }
  # Enable CloudWatch logging
  logging_info = {
    cloudwatch_logs_enabled = true
  }

  # Enable monitoring
  monitoring_info = {
    jmx_exporter_enabled  = true
    node_exporter_enabled = true
  }

  tags = module.tags.tags
}

MSK Connect Data Sink: Aurora PostgreSQL to Amazon S3

This Terraform example provisions MSK Connect components that enable data ingestion from an Amazon Aurora PostgreSQL database into Amazon S3, using Kafka Connect and Confluent plugins.

Prerequisites:

Before running the Terraform example in example/msk-connect, ensure the following components are pre-configured in your AWS environment: Aurora PostgreSQL Setup - An Aurora PostgreSQL cluster is already created. - A database named myapp is created within the cluster. - A sample table named users is present under schema public with sample data inserted.

VPC Configuration - A VPC Endpoint for S3 (Gateway type) is created to allow private communication between MSK Connect and S3.

Plugins Downloaded and Uploaded to S3

Download the required Kafka Connect plugins and upload them to the appropriate S3 bucket:

JDBC Source Plugin - Plugin: confluentinc-kafka-connect-jdbc-10.6.6.zip

S3 Sink Plugin - Plugin: confluentinc-kafka-connect-s3-10.6.6.zip

Module Overview

Once the above prerequisites are met, you can deploy the Terraform example to configure the data pipeline using:

# Source Connector

module "msk_connect" {
  source      = "sourcefuse/arc-msk/aws"
  version     = "0.0.1"

  # Enables MSK Connect components and plugins for source
  create_msk_components        = true
  create_custom_plugin         = true
  create_worker_configuration  = false
  create_connector             = true

  # Plugin and connector configurations
  plugin_name          = "jdbc-pg-plugin"
  plugin_content_type  = "ZIP"
  plugin_description   = "Custom plugin for MSK Connect"
  plugin_s3_bucket_arn = module.s3.bucket_arn
  plugin_s3_file_key   = "confluentinc-kafka-connect-jdbc-10.6.6.zip"

  connector_name       = "msk-pg-connector"
  kafkaconnect_version = "2.7.1"

  connector_configuration = {
    "connector.class" : "io.confluent.connect.jdbc.JdbcSourceConnector",
    ...
    "connection.url" : "jdbc:postgresql://${data.aws_ssm_parameter.db_endpoint.value}:5432/myapp"
  }

  ...
}

# Sink Connector

module "msk_s3_sink" {
  source      = "sourcefuse/arc-msk/aws"
  version     = "0.0.1"

  # Enables MSK Connect components and plugins for destination
  create_msk_components        = true
  create_custom_plugin         = true
  create_worker_configuration  = false
  create_connector             = true

  plugin_name          = "s3-sink-plugin"
  plugin_content_type  = "ZIP"
  plugin_description   = "Custom plugin for MSK Connect"
  plugin_s3_bucket_arn = module.s3.bucket_arn
  plugin_s3_file_key   = "confluentinc-kafka-connect-s3-10.6.6.zip"

  connector_name       = "msk-s3-sink-connector"
  kafkaconnect_version = "2.7.1"

  connector_configuration = {
    "connector.class" : "io.confluent.connect.s3.S3SinkConnector",
    ...
    "s3.bucket.name" : module.s3.bucket_id
  }

  ...
}
These modules will create MSK Connect plugins and connectors, enabling a seamless stream of data from PostgreSQL (public.users table) to S3 (cdc_aurora_users topic)

Requirements

Name Version
terraform > 1.4, < 2.0.0
aws ~> 5.0

Providers

No providers.

Modules

Name Source Version
msk_cluster ./modules/standard n/a
msk_connect ./modules/connect n/a
msk_serverless ./modules/serverless n/a

Resources

No resources.

Inputs

Name Description Type Default Required
authentication_type Client authentication type (e.g., NONE, IAM) string "" no
autoscaling_max_worker_count Maximum number of workers number 2 no
autoscaling_mcu_count Number of MCUs per worker number 1 no
autoscaling_min_worker_count Minimum number of workers number 2 no
az_distribution The distribution of broker nodes across availability zones. Currently the only valid value is DEFAULT string "DEFAULT" no
bootstrap_servers Bootstrap servers for the Kafka cluster string "" no
broker_instance_type Specify the instance type to use for the kafka brokers. e.g. kafka.m5.large string "kafka.m5.large" no
broker_storage Broker EBS storage configuration
object({
volume_size = number
provisioned_throughput_enabled = optional(bool, false)
volume_throughput = optional(number)
})
{
"volume_size": 100
}
no
capacity_mode The capacity mode for MSK Connect: 'autoscaling' or 'provisioned' string "autoscaling" no
client_authentication Cluster-level client authentication options
object({
sasl_scram_enabled = optional(bool, false)
sasl_iam_enabled = optional(bool, true)
tls_certificate_authority_arns = optional(list(string), [])
allow_unauthenticated_access = optional(bool, false)
})
{} no
client_broker_encryption Encryption setting for client broker communication. Valid values: TLS, TLS_PLAINTEXT, and PLAINTEXT string "TLS" no
cloudwatch_retention_in_days CloudWatch Retention Period Days number 7 no
cluster_configuration Configuration block for MSK
object({
create_configuration = bool
configuration_name = optional(string)
configuration_description = optional(string)
server_properties = optional(string)
configuration_arn = optional(string)
configuration_revision = optional(number)
})
{
"create_configuration": false
}
no
cluster_name Name of the MSK cluster string null no
cluster_type Type of MSK cluster. Valid values: provisioned ,serverless or null string null no
connectivity_config Connectivity settings for public and VPC access
object({
public_access_enabled = optional(bool, false)
public_access_type = optional(string, "SERVICE_PROVIDED_EIPS") # or "DISABLED"
})
{} no
connector_configuration Configuration map for the connector map(string) {} no
connector_name Name of the MSK Connect connector string "" no
create_cluster_policy Whether to create the MSK cluster policy bool false no
create_connector Whether to create the MSK connector bool false no
create_custom_plugin Whether to create the custom plugin bool false no
create_msk_components Flag to control creation of MSK Standard cluster bool false no
create_worker_configuration Whether to create the worker configuration bool false no
encryption_type Encryption type (e.g., TLS, PLAINTEXT) string "" no
enhanced_monitoring Specify the desired enhanced MSK CloudWatch monitoring level. Valid values: DEFAULT, PER_BROKER, PER_TOPIC_PER_BROKER, or PER_TOPIC_PER_PARTITION string "DEFAULT" no
in_cluster_encryption Whether data communication among broker nodes is encrypted. Default is true bool true no
kafka_version Specify the desired Kafka software version string "3.6.0" no
kafkaconnect_version Version of Kafka Connect string "" no
kms_config Configuration for KMS key. If create is true, a new KMS key will be created. If false, provide an existing key_arn.
object({
create = optional(bool, false)
key_arn = optional(string, null)
})
{
"create": false
}
no
log_delivery_cloudwatch_enabled Enable CloudWatch log delivery bool false no
log_delivery_firehose_delivery_stream Kinesis Firehose delivery stream name string "" no
log_delivery_firehose_enabled Enable Firehose log delivery bool false no
log_delivery_s3_bucket S3 bucket name for log delivery string "" no
log_delivery_s3_enabled Enable S3 log delivery bool false no
log_delivery_s3_prefix S3 prefix for log delivery string "" no
logging_config Logging settings
object({
cloudwatch_logs_enabled = optional(bool, false)
cloudwatch_log_group = optional(string)
cloudwatch_logs_retention_in_days = optional(number)
firehose_logs_enabled = optional(bool, false)
firehose_delivery_stream = optional(string)
s3_logs_enabled = optional(bool, false)
s3_logs_bucket = optional(string)
s3_logs_prefix = optional(string)
})
{} no
monitoring_info Open monitoring exporter settings
object({
jmx_exporter_enabled = optional(bool, false)
node_exporter_enabled = optional(bool, false)
})
{} no
msk_connector_policy_arns List of IAM policy ARNs to attach to the MSK Connector execution role map(string) {} no
number_of_broker_nodes The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets number 2 no
plugin_content_type Content type of the plugin (ZIP or JAR) string "" no
plugin_description Description of the custom plugin string null no
plugin_name Name of the custom plugin string "" no
plugin_s3_bucket_arn ARN of the S3 bucket containing the plugin string "" no
plugin_s3_file_key S3 key of the plugin file string "" no
policy_statements List of policy statements for the MSK cluster
list(object({
sid = string
effect = string
actions = list(string)
principal = map(any) # Allow "AWS", "Service", etc.
resources = list(string) # Optional, fallback to cluster_arn
}))
[] no
provisioned_mcu_count n/a number 2 no
provisioned_worker_count n/a number 1 no
sasl_iam_enabled Enable IAM-based SASL authentication bool true no
scale_in_cpu_utilization_percentage CPU utilization percentage for scale-in number 20 no
scale_out_cpu_utilization_percentage CPU utilization percentage for scale-out number 75 no
scram_credentials SCRAM credentials for MSK authentication.
- username: Optional. Will be generated if not provided.
- password: Optional. Will be generated if not provided.
object({
username = optional(string)
password = optional(string)
})
null no
security_group_ids List of security group IDs (up to five) list(string) [] no
storage_autoscaling_config Configuration for MSK broker storage autoscaling
object({
enabled = bool
max_capacity = optional(number, 250)
role_arn = optional(string, "")
target_value = optional(number, 70)
})
{
"enabled": false
}
no
storage_mode Controls storage mode for supported storage tiers. Valid values are: LOCAL or TIERED string null no
subnet_ids List of subnet IDs in at least two different Availability Zones list(string) [] no
tags A map of tags to assign to the MSK resources map(string) {} no
vpc_connections A map of MSK VPC connection configurations.
Each key is a unique connection name and value is an object with:
- authentication
- client_subnets
- security_groups
- target_cluster_arn
- vpc_id
- tags (optional)
map(object({
authentication = string
client_subnets = list(string)
security_groups = list(string)
vpc_id = string
}))
{} no
vpc_connectivity_client_authentication Client authentication for VPC connectivity
object({
sasl_scram_enabled = optional(bool, false)
sasl_iam_enabled = optional(bool, false)
tls_certificate_authority_arns = optional(list(string), [])
})
{} no
worker_config_description Description of the worker configuration string null no
worker_config_name Name of the worker configuration string "" no
worker_properties_file_content Contents of the connect-distributed.properties file string "" no

Outputs

Name Description
bootstrap_brokers Bootstrap brokers
bootstrap_brokers_public_sasl_iam Public SASL IAM bootstrap brokers
bootstrap_brokers_public_sasl_scram Public SASL SCRAM bootstrap brokers
bootstrap_brokers_public_tls Public TLS bootstrap brokers
bootstrap_brokers_sasl_iam SASL IAM bootstrap brokers
bootstrap_brokers_sasl_scram SASL SCRAM bootstrap brokers
bootstrap_brokers_tls TLS bootstrap brokers
bootstrap_brokers_vpc_connectivity_sasl_iam VPC SASL IAM bootstrap brokers
bootstrap_brokers_vpc_connectivity_sasl_scram VPC SASL SCRAM bootstrap brokers
bootstrap_brokers_vpc_connectivity_tls VPC TLS bootstrap brokers
cluster_arn Amazon Resource Name (ARN) of the MSK cluster
cluster_name MSK Cluster Name
cluster_policy_id ID of the MSK cluster policy
cluster_uuid UUID of the MSK cluster
configuration_latest_revision Latest revision of the MSK configuration
current_version Current version of the MSK cluster
kms_key_alias KMS Key Alias
msk_connect_connector_arn ARN of the MSK Connect connector
msk_connect_custom_plugin_arn ARN of the custom plugin
msk_connect_service_execution_role_arn ARN of the MSK Connector IAM Role
msk_connect_worker_configuration_arn ARN of the worker configuration
serverless_bootstrap_brokers_sasl_iam Bootstrap broker endpoints with SASL IAM
serverless_cluster_uuid UUID of the MSK serverless cluster
serverless_msk_cluster_arn ARN of the MSK serverless cluster
storage_mode Storage mode for the MSK cluster
zookeeper_connect_string Zookeeper connect string
zookeeper_connect_string_tls Zookeeper TLS connect string

Versioning

This project uses a .version file at the root of the repo which the pipeline reads from and does a git tag.

When you intend to commit to main, you will need to increment this version. Once the project is merged, the pipeline will kick off and tag the latest git commit.

Development

Prerequisites

Configurations

  • Configure pre-commit hooks
    pre-commit install
    

Versioning

while Contributing or doing git commit please specify the breaking change in your commit message whether its major,minor or patch

For Example

git commit -m "your commit message #major"
By specifying this , it will bump the version and if you don't specify this in your commit message then by default it will consider patch and will bump that accordingly

Tests

  • Tests are available in test directory
  • Configure the dependencies
    1
    2
    3
    cd test/
    go mod init github.com/sourcefuse/terraform-aws-refarch-<module_name>
    go get github.com/gruntwork-io/terratest/modules/terraform
    
  • Now execute the test
    go test -timeout  30m
    

Authors

This project is authored by: - SourceFuse ARC Team