AWS Glue Расшифровка И Конвертирование Csv В Paraquet
Пришла мне задача принимать gpg-зашифрованные csv, расшифровывать их с секретным ключом и паролем из AWS Secrets Manager, а затем сразу конвертировать в parquet, что бы после подключить AWS Athena и не думать о преобразованиях, партициях и так далее. Все должно стартовать само, когда в S3 появляется новый файл.
s3
Так как всё происходит в AWS, я вооружаюсь terraform и иду создавать 3 бакета.
- raw - непосредственно сюда будут загружать csv.gpg файлы, и по event’ам из этого бакета будет начинаться процесс конвертации
- decrypted - в этот бакет будут складываться расшифрованные файлы
- parquet - в этом бакете будут храниться сконвертированные из csv, paraquet файлы
- scripts - этот бакет нужен для aws glue, тут хранится код который glue будет запускать для дешифровки и конвертации
1resource "aws_s3_bucket" "raw" {
2 bucket = "raw-csv-gpg"
3}
4
5# Отправляем евенты в eventbridge, что бы выцепить создание объекта
6resource "aws_s3_bucket_notification" "raw" {
7 bucket = aws_s3_bucket.raw.bucket
8 eventbridge = true
9}
10
11resource "aws_s3_bucket" "decrypted" {
12 bucket = "decrypted-csv"
13}
14
15resource "aws_s3_bucket" "parquet" {
16 bucket = "parquet"
17}
18
19resource "aws_s3_bucket" "scripts" {
20 bucket = "glue-scripts"
21}
22
23# Для простоты, просто загружаем готовые python файлы в бакет scripts
24resource "aws_s3_object" "decrypt_script" {
25 bucket = aws_s3_bucket.scripts.bucket
26 key = "decrypt.py"
27 source = "${path.module}/src/decrypt.py"
28}
29
30resource "aws_s3_object" "convert_script" {
31 bucket = aws_s3_bucket.scripts.bucket
32 key = "convert.py"
33 source = "${path.module}/src/convert.py"
34}
EventBridge
Создаём правило в eventbridge которые слушает бакет raw, и если видит что в него загрузили файл с постфиксом .csv.gpg то запускается лямбда которая будет запускать workflow. Предвижу вопрос: “eventbridge умеет запускать workflow сам, зачем ты добавил лямбду?” Я отвечу, я не нашёл возможности передавать в glue в качестве параметра какой файл взять и начать обрабатывать. А если такого файла нет то будут обрабатываться всегда все файлы которые находятся в бакете, этого мне захотелось избежать
1resource "aws_cloudwatch_event_rule" "s3_upload" {
2 name = "s3-csv-gpg-trigger"
3
4 event_pattern = jsonencode({
5 "source": ["aws.s3"],
6 "detail-type": ["Object Created"],
7 "detail": {
8 "bucket": {
9 "name": [aws_s3_bucket.raw.bucket]
10 },
11 "object": {
12 "key": [{ "suffix": "csv.gpg" }]
13 }
14 }
15 })
16}
17
18resource "aws_cloudwatch_event_target" "lambda_target" {
19 rule = aws_cloudwatch_event_rule.s3_upload.name
20 target_id = "InvokeLambdaStarter"
21 arn = aws_lambda_function.workflow_starter.arn
22}
23
24resource "aws_lambda_permission" "allow_cloudwatch" {
25 statement_id = "AllowExecutionFromCloudWatch"
26 action = "lambda:InvokeFunction"
27 function_name = aws_lambda_function.workflow_starter.function_name
28 principal = "events.amazonaws.com"
29 source_arn = aws_cloudwatch_event_rule.s3_upload.arn
30}
Lambda
Простая лямбда, которая слушает eventbridge видит евент от s3 что создан файлик, вытаскивает из евента имя файла и запускает Glue workflow
1data "archive_file" "lambda_zip" {
2 type = "zip"
3 source_file = "lambda/lambda_handler.py"
4 output_path = "lambda/lambda_handler.zip"
5}
6
7resource "aws_lambda_function" "workflow_starter" {
8 function_name = "glue-workflow-start"
9 filename = data.archive_file.lambda_zip.output_path
10 source_code_hash = data.archive_file.lambda_zip.output_base64sha256
11 handler = "lambda_handler.lambda_handler"
12 runtime = "python3.14"
13 role = aws_iam_role.lambda_glue_invoker_role.arn
14 timeout = 60
15 environment {
16 variables = {
17 GLUE_WORKFLOW_NAME = aws_glue_workflow.convert_workflow.name
18 }
19 }
20}
21
22resource "aws_iam_role" "lambda_glue_invoker_role" {
23 name = "lambda-glue-invoker"
24
25 assume_role_policy = jsonencode({
26 Version = "2012-10-17"
27 Statement = [{
28 Action = "sts:AssumeRole"
29 Effect = "Allow"
30 Principal = { Service = "lambda.amazonaws.com" }
31 }]
32 })
33}
34
35resource "aws_iam_policy" "lambda_glue_policy" {
36 name = "lambda-glue-invoker"
37
38 policy = jsonencode({
39 Version = "2012-10-17"
40 Statement = [
41 {
42 Action = [
43 "logs:CreateLogGroup",
44 "logs:CreateLogStream",
45 "logs:PutLogEvents"
46 ]
47 Effect = "Allow"
48 Resource = "arn:aws:logs:*:*:*"
49 },
50 {
51 Effect = "Allow"
52 Action = "glue:StartWorkflowRun"
53 Resource = aws_glue_workflow.convert_workflow.arn
54 }
55 ]
56 })
57}
58
59resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
60 role = aws_iam_role.lambda_glue_invoker_role.name
61 policy_arn = aws_iam_policy.lambda_glue_policy.arn
62}
Самая лямбда функция. Код писала ИИшка, прошу сильно не бить, свою задачу выполняет, меня устраивает
1import json
2import os
3import boto3
4
5WORKFLOW_NAME = os.environ.get('GLUE_WORKFLOW_NAME')
6
7glue_client = boto3.client('glue')
8
9def lambda_handler(event, context):
10 print("Received event: " + json.dumps(event, indent=2))
11
12 if 'detail' not in event or 'bucket' not in event['detail'] or 'object' not in event['detail']:
13 print("Event structure missing S3 details. Exiting.")
14 return {'statusCode': 400, 'body': 'Invalid event structure'}
15
16 s3_bucket = event['detail']['bucket']['name']
17 s3_key = event['detail']['object']['key']
18
19 if not s3_key.lower().endswith('.csv.gpg'):
20 print(f"Skipping file: {s3_key}")
21 return {'statusCode': 200, 'body': 'Skipped'}
22
23 print(f"Starting workflow for s3://{s3_bucket}/{s3_key}")
24
25 try:
26 response = glue_client.start_workflow_run(
27 Name=WORKFLOW_NAME,
28 RunProperties={
29 'input_bucket': s3_bucket,
30 'input_key': s3_key
31 }
32 )
33 print(f"Glue Workflow Run Started: {response['RunId']}")
34
35 return {
36 'statusCode': 200,
37 'body': json.dumps({'message': 'Workflow started', 'RunId': response['RunId']})
38 }
39
40 except Exception as e:
41 print(f"Error starting Glue Workflow: {e}")
42 raise e
Glue
Теперь создаём AWS Glue. Нам требуется создать сам workflow который мы будем запускать, и две джобы которые будут заниматься расшифровкой и конвертированием.
1resource "aws_glue_workflow" "convert_workflow" {
2 name = "csv-gpg-convert"
3}
4
5resource "aws_glue_job" "decrypt" {
6 name = "decrypt-gpg"
7 role_arn = aws_iam_role.glue_decrypt.arn
8 glue_version = "4.0"
9
10 command {
11 name = "pythonshell"
12 python_version = "3.9"
13 script_location = "s3://${aws_s3_bucket.scripts.bucket}/decrypt.py"
14 }
15
16 default_arguments = {
17 "--RAW_BUCKET" = aws_s3_bucket.raw.bucket
18 "--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
19 "--GPG_KEY_SECRET" = "CSV_GPG_CONVERT/KEY"
20 "--GPG_PASSPHRASE_SECRET" = "CSV_GPG_CONVERT/PASSPHRASE"
21 "--additional-python-modules" = "python-gnupg==0.5.5"
22 }
23}
24
25resource "aws_glue_job" "convert" {
26 name = "csv-to-parquet"
27 role_arn = aws_iam_role.glue_convert.arn
28 glue_version = "4.0"
29
30 command {
31 name = "glueetl"
32 script_location = "s3://${aws_s3_bucket.scripts.bucket}/convert.py"
33 python_version = "3"
34 }
35
36 default_arguments = {
37 "--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
38 "--PARQUET_BUCKET" = aws_s3_bucket.parquet.bucket
39 }
40}
41
42# Запускаем расшифровку по команде из лямбды
43resource "aws_glue_trigger" "start_decrypt" {
44 name = "trigger-start-decrypt"
45 type = "EVENT"
46 workflow_name = aws_glue_workflow.convert_workflow.name
47
48 actions {
49 job_name = aws_glue_job.decrypt.name
50 }
51}
52
53# Конвертация запустится, только в том случае если job decrypt перешла в статус SUCCEEDED
54resource "aws_glue_trigger" "convert" {
55 name = "trigger-convert"
56 type = "CONDITIONAL"
57 workflow_name = aws_glue_workflow.convert_workflow.name
58
59 predicate {
60 conditions {
61 job_name = aws_glue_job.decrypt.name
62 state = "SUCCEEDED"
63 }
64 }
65
66 actions {
67 job_name = aws_glue_job.convert.name
68 }
69}
70
71resource "aws_iam_role" "glue_decrypt" {
72 name = "glue-decrypt-role"
73
74 assume_role_policy = jsonencode({
75 Version = "2012-10-17"
76 Statement = [{
77 Effect = "Allow"
78 Principal = { Service = "glue.amazonaws.com" }
79 Action = "sts:AssumeRole"
80 }]
81 })
82}
83
84resource "aws_iam_role_policy" "glue_decrypt_policy" {
85 role = aws_iam_role.glue_decrypt.name
86
87 policy = jsonencode({
88 Version = "2012-10-17"
89 Statement = [
90 {
91 Effect = "Allow"
92 Action = ["s3:ListBucket"]
93 Resource = [
94 "${aws_s3_bucket.raw.arn}",
95 "${aws_s3_bucket.decrypted.arn}"
96 ]
97 },
98 {
99 Effect = "Allow"
100 Action = ["s3:GetObject", "s3:PutObject"]
101 Resource = [
102 "${aws_s3_bucket.raw.arn}/*",
103 "${aws_s3_bucket.decrypted.arn}/*",
104 "${aws_s3_bucket.scripts.arn}/*"
105 ]
106 },
107 {
108 Effect = "Allow"
109 Action = ["secretsmanager:GetSecretValue"]
110 Resource = "arn:aws:secretsmanager:${var.region}:${data.aws_caller_identity.current.account_id}:secret:CSV_GPG_CONVERT*"
111 },
112 {
113 Effect = "Allow"
114 Action = ["glue:GetWorkflowRunProperties"]
115 Resource = [aws_glue_workflow.convert_workflow.arn]
116 },
117 {
118 Action = [
119 "logs:CreateLogGroup",
120 "logs:CreateLogStream",
121 "logs:PutLogEvents"
122 ]
123 Effect = "Allow"
124 Resource = "arn:aws:logs:*:*:*"
125 }
126 ]
127 })
128}
129
130resource "aws_iam_role" "glue_convert" {
131 name = "glue-convert-role"
132
133 assume_role_policy = jsonencode({
134 Version = "2012-10-17"
135 Statement = [{
136 Effect = "Allow"
137 Principal = { Service = "glue.amazonaws.com" }
138 Action = "sts:AssumeRole"
139 }]
140 })
141}
142
143resource "aws_iam_role_policy" "glue_convert_policy" {
144 name = "glue-convert-policy"
145 role = aws_iam_role.glue_convert.name
146
147 policy = jsonencode({
148 Version = "2012-10-17"
149 Statement = [
150 {
151 Effect = "Allow"
152 Action = ["s3:ListBucket"]
153 Resource = [
154 "${aws_s3_bucket.decrypted.arn}",
155 "${aws_s3_bucket.parquet.arn}",
156 "${aws_s3_bucket.scripts.arn}"
157 ]
158 },
159 {
160 Effect = "Allow"
161 Action = ["s3:GetObject"]
162 Resource = [
163 "${aws_s3_bucket.decrypted.arn}/*",
164 "${aws_s3_bucket.scripts.arn}/*"
165 ]
166 },
167 {
168 Effect = "Allow"
169 Action = ["s3:PutObject"]
170 Resource = ["${aws_s3_bucket.parquet.arn}/*"]
171 },
172 {
173 Action = [
174 "logs:CreateLogGroup",
175 "logs:CreateLogStream",
176 "logs:PutLogEvents"
177 ]
178 Effect = "Allow"
179 Resource = "arn:aws:logs:*:*:*"
180 },
181 {
182 Effect = "Allow"
183 Action = ["glue:GetWorkflowRunProperties"]
184 Resource = [aws_glue_workflow.convert_workflow.arn]
185 }
186 ]
187 })
188}
Так же прикладываю файлы decrypt.py и convert.py которые делают основную работу. Код так же писала ИИшка, прошу сильно не бить
convert.py
1import sys
2import json
3import boto3
4from awsglue.utils import getResolvedOptions
5from pyspark.context import SparkContext
6from awsglue.context import GlueContext
7from awsglue.job import Job
8
9args = getResolvedOptions(
10 sys.argv,
11 [
12 "JOB_NAME",
13 "DECRYPTED_BUCKET",
14 "PARQUET_BUCKET",
15 "WORKFLOW_NAME",
16 "WORKFLOW_RUN_ID"
17 ]
18)
19
20workflow_name = args["WORKFLOW_NAME"]
21workflow_run_id = args["WORKFLOW_RUN_ID"]
22decrypted_bucket = args["DECRYPTED_BUCKET"]
23parquet_bucket = args["PARQUET_BUCKET"]
24
25sc = SparkContext()
26glueContext = GlueContext(sc)
27spark = glueContext.spark_session
28
29job = Job(glueContext)
30job.init(args["JOB_NAME"], args)
31
32glue_client = boto3.client("glue")
33
34try:
35 resp = glue_client.get_workflow_run_properties(
36 Name=workflow_name,
37 RunId=workflow_run_id
38 )
39 run_props = resp.get("RunProperties", {})
40 print("Workflow Run Properties:")
41 print(json.dumps(run_props, indent=2))
42
43except Exception as e:
44 print(f"[FATAL] Failed to read Workflow Run Properties: {e}")
45 raise
46
47input_key = run_props.get("input_key")
48
49if not input_key:
50 raise Exception(
51 "[CRITICAL] Missing 'input_key' in Workflow Run Properties. "
52 "Check EventBridge input_template formatting."
53 )
54
55print(f"Input file (encrypted source): {input_key}")
56
57
58csv_key = input_key.replace(".gpg", "")
59input_path = f"s3://{decrypted_bucket}/{csv_key}"
60
61parquet_folder = csv_key.replace(".csv", "")
62output_path = f"s3://{parquet_bucket}/{parquet_folder}/"
63
64print(f"Read from: {input_path}")
65print(f"Write to: {output_path}")
66
67try:
68 df = (
69 spark.read
70 .option("header", "true")
71 .option("inferSchema", "true")
72 .csv(input_path)
73 )
74
75 df.write.mode("overwrite").parquet(output_path)
76
77 print("\n✔ Conversion completed successfully.\n")
78
79except Exception as e:
80 print(f"[ERROR] Spark conversion error: {e}")
81 raise
82
83
84job.commit()
decrypt.py
1import boto3
2import os
3import sys
4import shutil
5import gnupg
6from awsglue.utils import getResolvedOptions
7
8args = getResolvedOptions(sys.argv, [
9 "RAW_BUCKET",
10 "DECRYPTED_BUCKET",
11 "GPG_KEY_SECRET",
12 "GPG_PASSPHRASE_SECRET",
13 "WORKFLOW_NAME",
14 "WORKFLOW_RUN_ID"
15])
16
17RAW_BUCKET = args["RAW_BUCKET"]
18DECRYPTED_BUCKET = args["DECRYPTED_BUCKET"]
19SECRET_KEY_NAME = args["GPG_KEY_SECRET"]
20SECRET_PASSPHRASE_NAME = args["GPG_PASSPHRASE_SECRET"]
21WORKFLOW_NAME = args['WORKFLOW_NAME']
22RUN_ID = args['WORKFLOW_RUN_ID']
23
24s3 = boto3.client("s3")
25secrets = boto3.client("secretsmanager")
26glue_client = boto3.client("glue")
27
28GPG_HOME = "/tmp/gpg_home"
29
30if os.path.exists(GPG_HOME):
31 shutil.rmtree(GPG_HOME)
32
33os.makedirs(GPG_HOME, exist_ok=True)
34
35gpg = gnupg.GPG(gnupghome=GPG_HOME)
36
37print("GPG configured at", GPG_HOME)
38
39print("Loading GPG key...")
40private_key_raw = secrets.get_secret_value(SecretId=SECRET_KEY_NAME)["SecretString"]
41
42print("Loading passphrase...")
43passphrase = secrets.get_secret_value(SecretId=SECRET_PASSPHRASE_NAME)["SecretString"]
44
45private_key = private_key_raw.replace("\r", "").strip()
46
47print("Importing key...")
48import_result = gpg.import_keys(private_key)
49
50if not import_result.count:
51 raise Exception("Failed to import GPG key: " + str(import_result.stderr))
52
53print("Key imported. Fingerprints:", import_result.fingerprints)
54
55
56def decrypt_file(key):
57 enc_name = os.path.basename(key)
58 dec_name = enc_name.replace(".gpg", "")
59
60 enc_path = f"/tmp/{enc_name}"
61 dec_path = f"/tmp/{dec_name}"
62
63 print(f"Downloading s3://{RAW_BUCKET}/{key}...")
64 s3.download_file(RAW_BUCKET, key, enc_path)
65
66 print(f"Decrypting {enc_name}...")
67 with open(enc_path, "rb") as f:
68 result = gpg.decrypt_file(f, passphrase=passphrase, output=dec_path)
69
70 if not result.ok:
71 print("GPG ERROR:", result.stderr)
72 raise Exception("GPG decrypt failed")
73
74 print("Upload decrypted:", dec_name)
75 s3.upload_file(dec_path, DECRYPTED_BUCKET, dec_name)
76
77 os.remove(enc_path)
78 os.remove(dec_path)
79
80def main():
81 print(f"Fetching properties for Workflow: {WORKFLOW_NAME}, RunId: {RUN_ID}")
82
83 try:
84 run_props = glue_client.get_workflow_run_properties(
85 Name=WORKFLOW_NAME,
86 RunId=RUN_ID
87 )['RunProperties']
88
89 print("debug:")
90 print(glue_client.get_workflow_run_properties(
91 Name=WORKFLOW_NAME,
92 RunId=RUN_ID
93 ))
94
95 input_key = run_props.get('input_key')
96
97 if not input_key:
98 print("Warning: No 'input_key' found in RunProperties. Nothing to process.")
99 return
100
101 print(f"Triggered for file: {input_key}")
102 decrypt_file(input_key)
103
104 except Exception as e:
105 print(f"Error processing workflow properties: {str(e)}")
106 raise e
107
108 print("Cleaning up GPG home...")
109 shutil.rmtree(GPG_HOME, ignore_errors=True)
110
111 print("DONE")
112
113
114if __name__ == "__main__":
115 main()
Вот собственно и всё.