项目展示

  • 首页Our ProjectsAWS Glue 通过互相 TLS 认证与 Amazon MSK 连接 大数据博客

AWS Glue 通过互相 TLS 认证与 Amazon MSK 连接 大数据博客

2026-01-27 11:46:10 24

AWS Glue 通过互信 TLS 认证连接 Amazon MSK

作者 Edward Ondari 和 Emmanuel Mashandudze日期 2024年8月7日来源 Amazon Managed Streaming for Apache Kafka (Amazon MSK)标签 Analytics AWS Glue 中级 (200) 技术如何做

AWS Glue 通过互相 TLS 认证与 Amazon MSK 连接 大数据博客

关键点总结

数据流挑战与机遇 企业需要有效的系统处理流数据,Apache Kafka 是一种理想的解决方案。AWS Glue 与 Amazon MSK 本文介绍了如何通过 AWS Glue 通过互信 TLS 认证来处理 Amazon MSK 的流数据。实用案例 医院监测病人温度示例,用于展示数据流过程和使用 AWS Athena 进行分析。互信 TLS 认证设置 步骤包括创建证书授权中心、设置 MSK 集群和创建 Kafka 连接。

在当今的数据环境中,数据源源不断地涌入,从社交媒体到物联网设备的数据读取。为了有效利用这些实时数据,组织需要强大的系统来进行数据的摄取、处理和分析。Apache Kafka 作为一个分布式流媒体平台,彻底改变了公司处理实时数据管道和构建响应式事件驱动应用的方式。而 AWS Glue 则用于处理和分析来自 Apache Kafka 的流数据。

Amazon Managed Streaming for Apache Kafka (Amazon MSK) 是一项完全托管的 Apache Kafka 服务。您可以在新的或现有的 MSK 集群上激活多种 认证模式,包括 AWS 身份与访问管理IAM控制、互信传输层安全性TLS和简单认证与安全层/盐挑战响应机制SASL/SCRAM。有关使用 IAM 认证的更多信息,请参考 安全处理来自 Amazon MSK Serverless 的近实时数据。

互信 TLS 认证 要求服务器和客户端都出示证书以证明其身份,适合需要共同认证模型的混合应用。这也是一种常用的系统,用于企业间的应用,比如 开放银行 集成,确保金融机构的安全开放 API。对于 Amazon MSK,采用 AWS 私有证书授权中心 (AWS Private CA) 来签发 X509 证书并进行客户端认证。

一元云购官网

本文将详细介绍如何设置 AWS Glue 作业以通过互信 TLS 认证,在 MSK 集群中生成、消费和处理消息。AWS Glue 会自动推断流数据的模式并将元数据存储在 AWS Glue 数据目录,供使用 分析工具如 Amazon Athena进行分析。

示例用例

在我们的示例用例中,医院设施定期使用智能温度计监测急诊病房中病人的体温。每台设备都会自动记录病人的体温并将记录发布到一个中央监测应用程序API。每个发布的记录都是一个 JSON 格式的消息,包含唯一标识温度计的 deviceId、标识病人的 patientId、病人的温度读数,以及记录温度的 eventTime。

中央监测应用程序会每小时检查每位病人的平均体温,在病人的平均温度超出接受阈值361372C时通知医院的医疗人员。在我们的案例中,我们将使用 Athena 控制台分析这些读取数据。

解决方案概述

在本文中,我们使用 AWS Glue Python Shell 作业模拟来自医院温度计的输入数据。该作业使用互信 TLS 认证将消息安全地写入 MSK 集群。

为了处理来自 MSK 集群的流数据,我们部署一个 AWS Glue Streaming 提取、转换和加载 (ETL) 作业。该作业会自动推断输入数据的模式,并将模式元数据存储在数据目录中,然后将处理后的数据作为高效的 Parquet 文件存储在 Amazon 简单存储服务 (Amazon S3) 中。我们使用 Athena 查询数据目录中的输出表并提取数据洞见。

以下图示展示了解决方案的架构。

解决方案工作流的步骤包括:

使用 AWS Certificate Manager (ACM) 创建一个私有证书授权中心 (CA)。设置具备互信 TLS 认证的 MSK 集群。创建一个 Java 密钥库 (JKS) 文件并生成客户端证书和私钥。在 AWS Glue 中创建 Kafka 连接。创建 AWS Glue 中的 Python Shell 作业来创建主题并将消息推送到 Kafka。创建 AWS Glue Streaming 作业来消费和处理消息。在 Athena 中分析处理后的数据。

前提条件

您需要具备以下条件:

访问 AWS CloudShell 或 AWS 命令行接口 (AWS CLI)。拥有一个 VPC,至少包含两个 Availability Zone 的两个子网和一个具有公共子网路由的 NAT 网关。您可以使用以下 AWS CloudFormation 堆栈设置 VPC:

该模板会创建两个 NAT 网关,如下图所示。然而,对于测试和开发工作负载,您可以将流量路由到一个 Availability Zone 的单个 NAT 网关。为了在生产工作负载中提供冗余,建议每个 Availability Zone 都有一个 NAT 网关可用。

该堆栈还会创建一个安全组,并添加一个 自引用规则,允许 AWS Glue 组件之间的通信。

使用 ACM 创建私有 CA

完成以下步骤以创建根 CA。有关更多详细信息,请参考 创建私有 CA。

在 AWS 私有 CA 控制台中,选择 创建私有 CA。对于 模式选项,选择 通用 或 短期证书 以降低价格。对于 CA 类型选项,选择 根。提供证书详细信息,至少提供一个唯一名称。

保持其余默认选项,选择确认复选框。选择 创建 CA。在 操作 菜单中,选择 安装 CA 证书,然后选择 确认并安装。

设置具有互信 TLS 认证的 MSK 集群

在设置 MSK 集群之前,请确保您拥有一个至少包含两个不同 Availability Zone 中的两个私有子网的 VPC,以及一个具有互联网访问权限的 NAT 网关。如前提条件部分所示,包含了一个 CloudFormation 模板。

完成以下步骤以设置您的集群:

在 Amazon MSK 控制台上,选择 创建集群。对于 创建方法,选择 自定义创建。对于 集群类型,选择 预配集群。对于 代理大小,您可以选择 kafkat3small 以应对本文目的。对于 可用区数,选择 2。选择 下一步。在 网络 部分,选择在前提条件部分中创建的 VPC、私有子网和安全组。在 安全设置 部分,选择 通过 AWS Certificate Manager (ACM) 进行 TLS 客户端认证。对于 AWS 私有 CA,选择您之前创建的 AWS 私有 CA。

创建 MSK 集群可能需要最多 30 分钟。

创建 JKS 文件并生成客户端证书和私钥

使用根 CA 后,您将生成用于认证的客户端证书。以下说明适用于 CloudShell,但也可以调整为在启用了 Java 和 AWS CLI 的客户端计算机上执行。

打开新的 CloudShell 会话,运行以下命令创建 certs 目录并安装 Java:

bashmkdir certscd certssudo yum y install java11amazoncorrettoheadless

运行以下命令以在 JKS 格式中创建包含私钥的密钥库文件。将 DistinguishedName、ExampleAlias、YourStorePass 和 YourKeyPass 替换为您选择的字符串:

bashkeytool genkey keystore kafkaclientkeystorejks validity 300 storepass YourStorePass keypass YourKeyPass dname CN=DistinguishedName alias ExampleAlias storetype pkcs12

使用在上一步创建的私钥生成证书签名请求CSR:

bashkeytool keystore kafkaclientkeystorejks certreq file csrpem alias ExampleAlias storepass YourStorePass keypass YourKeyPass

运行以下命令从文件开头和结尾处删除内容中的单词 NEW及后面跟随的空格:

bashsed i E 1 s/NEW //

该文件应以 BEGIN CERTIFICATE REQUEST 开头并以 END CERTIFICATE REQUEST 结束。

使用 CSR 文件生成客户端证书,使用以下命令来替换 PrivateCAARN 以填入您创建的私有 CA 的 ARN:

bashaws acmpca issuecertificate certificateauthorityarn PrivateCAARN csr fileb//csrpem signingalgorithm SHA256WITHRSA validity Value=300Type=DAYS

该命令应打印出颁发证书的 ARN。请保存 CertificateArn 值以供后续使用。

json{  CertificateArn arnawsacmpcaregionaccountcertificateauthority/CAID/certificate/certificateID}

使用 PrivateCAARN 和前一步中生成的 CertificateArn (arnawsacppcaltregiongt) 来获取签名的客户端证书。此命令将创建 clientcertpem 文件。

bashaws acmpca getcertificate certificateauthorityarn PrivateCAARN certificatearn CertificateARN jq r Certificate n CertificateChain gtgt clientcertpem

将证书添加到 Java 密钥库内,以便在与 MSK 代理通信时呈现:

bashkeytool keystore kafkaclientkeystorejks import file clientcertpem alias ExampleAlias storepass YourStorePass keypass YourKeyPass noprompt

从 JKS 文件中提取私钥。提供相同的 destkeypass 和 deststorepass,并在提示时输入密钥库密码。

bashkeytool importkeystore srckeystore kafkaclientkeystorejks destkeystore keystorep12 srcalias ExampleAlias deststorepass YourStorePass destkeypass YourKeyPass deststoretype PKCS12

将私钥转换为 PEM 格式。输入您在上一步提供的密钥库密码。

bashopenssl pkcs12 in keystorep12 nodes nocerts out privatekeypem

从文件头部删除以 Bag Attributes 开头的行:

bashsed i ne /BEGIN PRIVATE KEY//END PRIVATE KEY/p privatekeypem

上传 clientcertpem、clientkeystorejks 和 privatekeypem 文件到 Amazon S3。您可以选择 创建一个新的 S3 存储桶 或使用现有的存储桶来存储这些对象。将 lt s3//awsglueassets11111111222222useast1/certs/gt 替换为您的 S3 存储位置。

bashaws s3 sync /certs s3//awsglueassets11111111222222useast1/certs/ exclude include clientcertpem include privatekeypem include kafkaclientkeystorejks

在 AWS Glue 中创建 Kafka 连接

完成以下步骤以创建 Kafka 连接:

在 AWS Glue 控制台中,选择导航面板中的 数据连接。选择 创建连接。选择 Apache Kafka,然后选择 下一步。对于 Amazon Managed Streaming for Apache Kafka 集群,选择您之前创建的 MSK 集群。

为 认证方法 选择 TLS 客户端认证。输入您之前创建的密钥库的 S3 路径,并提供用于 storepass 和 keypass 的密钥库和客户端密钥密码。

在 网络选项 中,选择您的 VPC、一个私有子网和一个安全组。安全组应包含一个 自引用规则。在下一页面上,为连接提供一个名称例如 Kafkaconnection并选择 创建连接。

创建 AWS Glue 中的 Python Shell 作业以创建主题并将消息推送到 Kafka

订阅我们的时事通讯

获取更多更新