terraform-aws-arc-msk¶
Overview¶
SourceFuse AWS Reference Architecture (ARC) Terraform module for managing the AWS MSK module.
Features¶
- Create an MSK cluster with customizable broker configuration
- Configure encryption in transit and at rest
- Set up authentication methods (SASL/SCRAM, IAM, TLS)
- Enable monitoring with Prometheus JMX and Node exporters
- Configure logging to CloudWatch, Kinesis Firehose, or S3
- Create and manage MSK configurations
- Associate SCRAM secrets for authentication
- Deploy MSK Connectors to stream data in and out of Kafka
- Create and manage custom plugins stored in S3
- Configure MSK Connect worker configuration via properties file
- Deploy MSK Connect connectors with support for autoscaling or provisioned mode
Introduction¶
This Terraform module provisions a fully configurable Amazon MSK (Managed Streaming for Apache Kafka) cluster with support for encryption, authentication (IAM, TLS, SASL/SCRAM), monitoring, and logging. It also enables deployment of MSK Connect components including custom plugins, worker configurations, and connectors with autoscaling or provisioned capacity, along with log delivery to CloudWatch, Firehose, or S3—empowering end-to-end Kafka data streaming pipelines.
Usage¶
To see a full example, check out the main.tf file in the example folder.
Basic Usage¶
MSK Connect Data Sink: Aurora PostgreSQL to Amazon S3¶
This Terraform example provisions MSK Connect components that enable data ingestion from an Amazon Aurora PostgreSQL database into Amazon S3, using Kafka Connect and Confluent plugins.¶
Prerequisites:
Before running the Terraform example in example/msk-connect, ensure the following components are pre-configured in your AWS environment: Aurora PostgreSQL Setup - An Aurora PostgreSQL cluster is already created. - A database named myapp is created within the cluster. - A sample table named users is present under schema public with sample data inserted.
VPC Configuration - A VPC Endpoint for S3 (Gateway type) is created to allow private communication between MSK Connect and S3.
Plugins Downloaded and Uploaded to S3
Download the required Kafka Connect plugins and upload them to the appropriate S3 bucket:
JDBC Source Plugin - Plugin: confluentinc-kafka-connect-jdbc-10.6.6.zip
S3 Sink Plugin - Plugin: confluentinc-kafka-connect-s3-10.6.6.zip
Module Overview¶
Once the above prerequisites are met, you can deploy the Terraform example to configure the data pipeline using:
Requirements¶
Name | Version |
---|---|
terraform | > 1.4, < 2.0.0 |
aws | ~> 5.0 |
Providers¶
No providers.
Modules¶
Name | Source | Version |
---|---|---|
msk_cluster | ./modules/standard | n/a |
msk_connect | ./modules/connect | n/a |
msk_serverless | ./modules/serverless | n/a |
Resources¶
No resources.
Inputs¶
Name | Description | Type | Default | Required |
---|---|---|---|---|
authentication_type | Client authentication type (e.g., NONE, IAM) | string |
"" |
no |
autoscaling_max_worker_count | Maximum number of workers | number |
2 |
no |
autoscaling_mcu_count | Number of MCUs per worker | number |
1 |
no |
autoscaling_min_worker_count | Minimum number of workers | number |
2 |
no |
az_distribution | The distribution of broker nodes across availability zones. Currently the only valid value is DEFAULT | string |
"DEFAULT" |
no |
bootstrap_servers | Bootstrap servers for the Kafka cluster | string |
"" |
no |
broker_instance_type | Specify the instance type to use for the kafka brokers. e.g. kafka.m5.large | string |
"kafka.m5.large" |
no |
broker_storage | Broker EBS storage configuration | object({ |
{ |
no |
capacity_mode | The capacity mode for MSK Connect: 'autoscaling' or 'provisioned' | string |
"autoscaling" |
no |
client_authentication | Cluster-level client authentication options | object({ |
{} |
no |
client_broker_encryption | Encryption setting for client broker communication. Valid values: TLS, TLS_PLAINTEXT, and PLAINTEXT | string |
"TLS" |
no |
cloudwatch_retention_in_days | CloudWatch Retention Period Days | number |
7 |
no |
cluster_configuration | Configuration block for MSK | object({ |
{ |
no |
cluster_name | Name of the MSK cluster | string |
null |
no |
cluster_type | Type of MSK cluster. Valid values: provisioned ,serverless or null | string |
null |
no |
connectivity_config | Connectivity settings for public and VPC access | object({ |
{} |
no |
connector_configuration | Configuration map for the connector | map(string) |
{} |
no |
connector_name | Name of the MSK Connect connector | string |
"" |
no |
create_cluster_policy | Whether to create the MSK cluster policy | bool |
false |
no |
create_connector | Whether to create the MSK connector | bool |
false |
no |
create_custom_plugin | Whether to create the custom plugin | bool |
false |
no |
create_msk_components | Flag to control creation of MSK Standard cluster | bool |
false |
no |
create_worker_configuration | Whether to create the worker configuration | bool |
false |
no |
encryption_type | Encryption type (e.g., TLS, PLAINTEXT) | string |
"" |
no |
enhanced_monitoring | Specify the desired enhanced MSK CloudWatch monitoring level. Valid values: DEFAULT, PER_BROKER, PER_TOPIC_PER_BROKER, or PER_TOPIC_PER_PARTITION | string |
"DEFAULT" |
no |
in_cluster_encryption | Whether data communication among broker nodes is encrypted. Default is true | bool |
true |
no |
kafka_version | Specify the desired Kafka software version | string |
"3.6.0" |
no |
kafkaconnect_version | Version of Kafka Connect | string |
"" |
no |
kms_config | Configuration for KMS key. If create is true, a new KMS key will be created. If false, provide an existing key_arn . |
object({ |
{ |
no |
log_delivery_cloudwatch_enabled | Enable CloudWatch log delivery | bool |
false |
no |
log_delivery_firehose_delivery_stream | Kinesis Firehose delivery stream name | string |
"" |
no |
log_delivery_firehose_enabled | Enable Firehose log delivery | bool |
false |
no |
log_delivery_s3_bucket | S3 bucket name for log delivery | string |
"" |
no |
log_delivery_s3_enabled | Enable S3 log delivery | bool |
false |
no |
log_delivery_s3_prefix | S3 prefix for log delivery | string |
"" |
no |
logging_config | Logging settings | object({ |
{} |
no |
monitoring_info | Open monitoring exporter settings | object({ |
{} |
no |
msk_connector_policy_arns | List of IAM policy ARNs to attach to the MSK Connector execution role | map(string) |
{} |
no |
number_of_broker_nodes | The desired total number of broker nodes in the kafka cluster. It must be a multiple of the number of specified client subnets | number |
2 |
no |
plugin_content_type | Content type of the plugin (ZIP or JAR) | string |
"" |
no |
plugin_description | Description of the custom plugin | string |
null |
no |
plugin_name | Name of the custom plugin | string |
"" |
no |
plugin_s3_bucket_arn | ARN of the S3 bucket containing the plugin | string |
"" |
no |
plugin_s3_file_key | S3 key of the plugin file | string |
"" |
no |
policy_statements | List of policy statements for the MSK cluster | list(object({ |
[] |
no |
provisioned_mcu_count | n/a | number |
2 |
no |
provisioned_worker_count | n/a | number |
1 |
no |
sasl_iam_enabled | Enable IAM-based SASL authentication | bool |
true |
no |
scale_in_cpu_utilization_percentage | CPU utilization percentage for scale-in | number |
20 |
no |
scale_out_cpu_utilization_percentage | CPU utilization percentage for scale-out | number |
75 |
no |
scram_credentials | SCRAM credentials for MSK authentication. - username: Optional. Will be generated if not provided. - password: Optional. Will be generated if not provided. |
object({ |
null |
no |
security_group_ids | List of security group IDs (up to five) | list(string) |
[] |
no |
storage_autoscaling_config | Configuration for MSK broker storage autoscaling | object({ |
{ |
no |
storage_mode | Controls storage mode for supported storage tiers. Valid values are: LOCAL or TIERED | string |
null |
no |
subnet_ids | List of subnet IDs in at least two different Availability Zones | list(string) |
[] |
no |
tags | A map of tags to assign to the MSK resources | map(string) |
{} |
no |
vpc_connections | A map of MSK VPC connection configurations. Each key is a unique connection name and value is an object with: - authentication - client_subnets - security_groups - target_cluster_arn - vpc_id - tags (optional) |
map(object({ |
{} |
no |
vpc_connectivity_client_authentication | Client authentication for VPC connectivity | object({ |
{} |
no |
worker_config_description | Description of the worker configuration | string |
null |
no |
worker_config_name | Name of the worker configuration | string |
"" |
no |
worker_properties_file_content | Contents of the connect-distributed.properties file | string |
"" |
no |
Outputs¶
Name | Description |
---|---|
bootstrap_brokers | Bootstrap brokers |
bootstrap_brokers_public_sasl_iam | Public SASL IAM bootstrap brokers |
bootstrap_brokers_public_sasl_scram | Public SASL SCRAM bootstrap brokers |
bootstrap_brokers_public_tls | Public TLS bootstrap brokers |
bootstrap_brokers_sasl_iam | SASL IAM bootstrap brokers |
bootstrap_brokers_sasl_scram | SASL SCRAM bootstrap brokers |
bootstrap_brokers_tls | TLS bootstrap brokers |
bootstrap_brokers_vpc_connectivity_sasl_iam | VPC SASL IAM bootstrap brokers |
bootstrap_brokers_vpc_connectivity_sasl_scram | VPC SASL SCRAM bootstrap brokers |
bootstrap_brokers_vpc_connectivity_tls | VPC TLS bootstrap brokers |
cluster_arn | Amazon Resource Name (ARN) of the MSK cluster |
cluster_name | MSK Cluster Name |
cluster_policy_id | ID of the MSK cluster policy |
cluster_uuid | UUID of the MSK cluster |
configuration_latest_revision | Latest revision of the MSK configuration |
current_version | Current version of the MSK cluster |
kms_key_alias | KMS Key Alias |
msk_connect_connector_arn | ARN of the MSK Connect connector |
msk_connect_custom_plugin_arn | ARN of the custom plugin |
msk_connect_service_execution_role_arn | ARN of the MSK Connector IAM Role |
msk_connect_worker_configuration_arn | ARN of the worker configuration |
serverless_bootstrap_brokers_sasl_iam | Bootstrap broker endpoints with SASL IAM |
serverless_cluster_uuid | UUID of the MSK serverless cluster |
serverless_msk_cluster_arn | ARN of the MSK serverless cluster |
storage_mode | Storage mode for the MSK cluster |
zookeeper_connect_string | Zookeeper connect string |
zookeeper_connect_string_tls | Zookeeper TLS connect string |
Versioning¶
This project uses a .version
file at the root of the repo which the pipeline reads from and does a git tag.
When you intend to commit to main
, you will need to increment this version. Once the project is merged,
the pipeline will kick off and tag the latest git commit.
Development¶
Prerequisites¶
Configurations¶
- Configure pre-commit hooks
Versioning¶
while Contributing or doing git commit please specify the breaking change in your commit message whether its major,minor or patch
For Example
Tests¶
- Tests are available in
test
directory - Configure the dependencies
- Now execute the test
Authors¶
This project is authored by: - SourceFuse ARC Team