Skip to content

Module Structure

terraform-aws-arc-kinesis-firehose

Latest Release Last Updated Terraform GitHub Actions

Quality gate


Overview

This module provisions and manages Kinesis Data Firehose delivery streams with full support for multiple destinations, IAM roles, CloudWatch logging, Lambda transformation, KMS encryption, and dynamic partitioning.

Features

  • Multiple destinations: extended_s3, redshift, opensearch, http_endpoint
  • Auto-created IAM role with least-privilege policies (or bring your own)
  • CloudWatch logging with auto-created log group and stream
  • KMS encryption support (AWS-managed or customer-managed)
  • Lambda data transformation via processing configuration
  • Parquet/ORC format conversion via AWS Glue
  • Dynamic partitioning with JQ metadata extraction
  • S3 backup for all non-S3 destinations
  • VPC support for OpenSearch destinations
  • Kinesis Data Stream as source

Usage

Basic S3

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "my-stream"
  destination = "extended_s3"

  s3_configuration = {
    bucket_arn = aws_s3_bucket.my_bucket.arn
  }

  tags = { Environment = "prod" }
}

S3 with KMS Encryption

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "my-encrypted-stream"
  destination = "extended_s3"

  s3_configuration = {
    bucket_arn = aws_s3_bucket.my_bucket.arn
  }

  kms_key_arn = aws_kms_key.my_key.arn

  tags = { Environment = "prod" }
}

With Lambda Transformation

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "my-transform-stream"
  destination = "extended_s3"

  s3_configuration = {
    bucket_arn = aws_s3_bucket.my_bucket.arn
  }

  lambda_arn = aws_lambda_function.transformer.arn

  tags = { Environment = "prod" }
}

Redshift

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "my-redshift-stream"
  destination = "redshift"

  s3_configuration = {
    bucket_arn = aws_s3_bucket.staging.arn
  }

  redshift_configuration = {
    cluster_jdbcurl = "jdbc:redshift://my-cluster.abc.us-east-1.redshift.amazonaws.com:5439/mydb"
    username        = "firehose_user"
    password        = var.redshift_password
    data_table_name = "events"
  }

  tags = { Environment = "prod" }
}

OpenSearch

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "my-opensearch-stream"
  destination = "opensearch"

  s3_configuration = {
    bucket_arn = aws_s3_bucket.backup.arn
  }

  opensearch_domain_arn    = aws_opensearch_domain.my_domain.arn
  opensearch_configuration = {
    index_name = "my-index"
  }

  tags = { Environment = "prod" }
}

Dynamic Partitioning

module "firehose" {
  source  = "sourcefuse/arc-kinesis-firehose/aws"
  version = "0.0.1"

  name        = "partitioned-stream"
  destination = "extended_s3"

  s3_configuration = {
    bucket_arn          = aws_s3_bucket.my_bucket.arn
    buffering_size      = 64
    prefix              = "data/customer_id=!{partitionKeyFromQuery:customer_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/"
    error_output_prefix = "errors/!{firehose:error-output-type}/"
  }

  enable_dynamic_partitioning = true

  additional_processors = [{
    type = "MetadataExtraction"
    parameters = [
      { parameter_name = "JsonParsingEngine",       parameter_value = "JQ-1.6" },
      { parameter_name = "MetadataExtractionQuery", parameter_value = "{customer_id:.customer_id}" },
    ]
  }]

  tags = { Environment = "prod" }
}

Examples

Example Description
basic-s3 Simple delivery to S3 with GZIP
s3-encrypted S3 with KMS encryption and optional Parquet
redshift Delivery to Redshift via S3 staging
lambda-transform Lambda data transformation before S3
opensearch Delivery to OpenSearch domain
dynamic-partitioning S3 with JQ-based dynamic partitioning

License

Apache 2.0 — see LICENSE.

Requirements

Name Version
terraform >= 1.5.0
aws >= 5.0, < 7.0

Providers

Name Version
aws 6.42.0

Modules

No modules.

Resources

Name Type
aws_cloudwatch_log_group.firehose resource
aws_cloudwatch_log_stream.firehose resource
aws_iam_role.firehose resource
aws_iam_role_policy.firehose resource
aws_kinesis_firehose_delivery_stream.this resource
aws_caller_identity.current data source
aws_iam_policy_document.firehose_assume_role data source
aws_iam_policy_document.firehose_policy data source
aws_partition.current data source
aws_region.current data source

Inputs

Name Description Type Default Required
additional_processors Additional processing configuration blocks (e.g., MetadataExtraction, RecordDeAggregation).
list(object({
type = string
parameters = optional(list(object({
parameter_name = string
parameter_value = string
})), [])
}))
[] no
create_iam_role Whether to create an IAM role for Firehose. Set false to provide an existing role via iam_role_arn. bool true no
destination Destination type. Valid values: extended_s3, redshift, opensearch, http_endpoint. string n/a yes
dynamic_partitioning_retry_duration Retry duration in seconds for dynamic partitioning (0–7200). number 300 no
enable_dynamic_partitioning Enable dynamic partitioning for extended_s3 destination. bool false no
enable_format_conversion Enable data format conversion (Parquet/ORC) via AWS Glue. bool false no
enable_sse Enable server-side encryption on the delivery stream. bool true no
glue_database_name Glue database name for schema. Required when enable_format_conversion is true. string null no
glue_role_arn IAM role ARN for Glue access. Defaults to the Firehose role. string null no
glue_table_name Glue table name for schema. Required when enable_format_conversion is true. string null no
http_endpoint_configuration Configuration block for HTTP endpoint destination.
object({
url = string
name = optional(string)
access_key = optional(string)
buffering_size = optional(number, 5)
buffering_interval = optional(number, 300)
retry_duration = optional(number, 300)
s3_backup_mode = optional(string, "FailedDataOnly")
content_encoding = optional(string, "NONE")
common_attributes = optional(list(object({ name = string, value = string })), [])
})
null no
iam_role_arn ARN of an existing IAM role. Required when create_iam_role is false. string null no
kinesis_data_stream Kinesis Data Stream source configuration.
object({
stream_arn = string
role_arn = optional(string, null)
})
null no
kms_key_arn ARN of a KMS key for server-side encryption. If null, AWS-managed key is used. string null no
lambda_arn ARN of the Lambda function for data transformation. Enables transformation when set. string null no
logging_config CloudWatch logging configuration for the delivery stream.
object({
enable = optional(bool, true)
log_group_name = optional(string, null)
log_stream_name = optional(string, null)
})
{} no
name Name of the Kinesis Firehose delivery stream. string n/a yes
opensearch_configuration Configuration block for OpenSearch destination.
object({
index_name = string
index_rotation_period = optional(string, "OneDay")
buffering_interval = optional(number, 300)
buffering_size = optional(number, 5)
retry_duration = optional(number, 300)
s3_backup_mode = optional(string, "FailedDocumentsOnly")
type_name = optional(string)
cluster_endpoint = optional(string)
})
null no
opensearch_domain_arn ARN of the OpenSearch domain. string null no
output_format Output format for format conversion. Valid values: PARQUET, ORC. string "PARQUET" no
redshift_configuration Configuration block for Redshift destination.
object({
cluster_jdbcurl = string
username = optional(string)
password = optional(string)
data_table_name = string
copy_options = optional(string)
data_table_columns = optional(string)
retry_duration = optional(number, 3600)
s3_backup_mode = optional(string, "Disabled")
})
null no
s3_backup_configuration S3 backup configuration for extended_s3 destination.
object({
mode = optional(string, "Disabled")
bucket_arn = optional(string, null)
})
{} no
s3_configuration S3 delivery/staging configuration.
object({
bucket_arn = optional(string, null)
prefix = optional(string, null)
error_output_prefix = optional(string, null)
buffering_size = optional(number, 5)
buffering_interval = optional(number, 300)
compression_format = optional(string, "UNCOMPRESSED")
})
{} no
tags Map of tags to assign to all resources. map(string) {} no
vpc_config VPC configuration for OpenSearch destination.
object({
subnet_ids = list(string)
security_group_ids = list(string)
role_arn = optional(string)
})
null no

Outputs

Name Description
iam_role_arn ARN of the IAM role used by Firehose.
iam_role_name Name of the IAM role created for Firehose (null if externally provided).
log_group_name CloudWatch log group name.
log_stream_name CloudWatch log stream name.
stream_arn ARN of the Kinesis Firehose delivery stream.
stream_name Name of the Kinesis Firehose delivery stream.

Versioning

This project uses a .version file at the root of the repo which the pipeline reads from and does a git tag.

When you intend to commit to main, you will need to increment this version. Once the project is merged, the pipeline will kick off and tag the latest git commit.

Development

Prerequisites

Configurations

  • Configure pre-commit hooks
    pre-commit install
    

Versioning

while Contributing or doing git commit please specify the breaking change in your commit message whether its major,minor or patch

For Example

git commit -m "your commit message #major"
By specifying this , it will bump the version and if you don't specify this in your commit message then by default it will consider patch and will bump that accordingly

Tests

  • Tests are available in test directory
  • Configure the dependencies
    1
    2
    3
    cd test/
    go mod init github.com/sourcefuse/terraform-aws-refarch-<module_name>
    go get github.com/gruntwork-io/terratest/modules/terraform
    
  • Now execute the test
    go test -timeout  30m
    

Authors

This project is authored by: - SourceFuse ARC Team