AWS Glue Расшифровка И Конвертирование Csv В Paraquet

Пришла мне задача принимать gpg-зашифрованные csv, расшифровывать их с секретным ключом и паролем из AWS Secrets Manager, а затем сразу конвертировать в parquet, что бы после подключить AWS Athena и не думать о преобразованиях, партициях и так далее. Все должно стартовать само, когда в S3 появляется новый файл.

s3

Так как всё происходит в AWS, я вооружаюсь terraform и иду создавать 3 бакета.

  • raw - непосредственно сюда будут загружать csv.gpg файлы, и по event’ам из этого бакета будет начинаться процесс конвертации
  • decrypted - в этот бакет будут складываться расшифрованные файлы
  • parquet - в этом бакете будут храниться сконвертированные из csv, paraquet файлы
  • scripts - этот бакет нужен для aws glue, тут хранится код который glue будет запускать для дешифровки и конвертации
 1resource "aws_s3_bucket" "raw" {
 2	bucket = "raw-csv-gpg"
 3}
 4
 5# Отправляем евенты в eventbridge, что бы выцепить создание объекта
 6resource "aws_s3_bucket_notification" "raw" {
 7	bucket = aws_s3_bucket.raw.bucket
 8	eventbridge = true
 9}
10
11resource "aws_s3_bucket" "decrypted" {
12	bucket = "decrypted-csv"
13}
14
15resource "aws_s3_bucket" "parquet" {
16	bucket = "parquet"
17}
18
19resource "aws_s3_bucket" "scripts" {
20	bucket = "glue-scripts"
21}
22
23# Для простоты, просто загружаем готовые python файлы в бакет scripts
24resource "aws_s3_object" "decrypt_script" {
25	bucket = aws_s3_bucket.scripts.bucket
26	key = "decrypt.py"
27	source = "${path.module}/src/decrypt.py"
28}
29
30resource "aws_s3_object" "convert_script" {
31	bucket = aws_s3_bucket.scripts.bucket
32	key = "convert.py"
33	source = "${path.module}/src/convert.py"
34}

EventBridge

Создаём правило в eventbridge которые слушает бакет raw, и если видит что в него загрузили файл с постфиксом .csv.gpg то запускается лямбда которая будет запускать workflow. Предвижу вопрос: “eventbridge умеет запускать workflow сам, зачем ты добавил лямбду?” Я отвечу, я не нашёл возможности передавать в glue в качестве параметра какой файл взять и начать обрабатывать. А если такого файла нет то будут обрабатываться всегда все файлы которые находятся в бакете, этого мне захотелось избежать

 1resource "aws_cloudwatch_event_rule" "s3_upload" {
 2  name = "s3-csv-gpg-trigger"
 3
 4  event_pattern = jsonencode({
 5    "source": ["aws.s3"],
 6    "detail-type": ["Object Created"],
 7    "detail": {
 8      "bucket": {
 9        "name": [aws_s3_bucket.raw.bucket]
10      },
11      "object": {
12        "key": [{ "suffix": "csv.gpg" }]
13      }
14    }
15  })
16}
17
18resource "aws_cloudwatch_event_target" "lambda_target" {
19  rule      = aws_cloudwatch_event_rule.s3_upload.name
20  target_id = "InvokeLambdaStarter"
21  arn       = aws_lambda_function.workflow_starter.arn
22}
23
24resource "aws_lambda_permission" "allow_cloudwatch" {
25  statement_id  = "AllowExecutionFromCloudWatch"
26  action        = "lambda:InvokeFunction"
27  function_name = aws_lambda_function.workflow_starter.function_name
28  principal     = "events.amazonaws.com"
29  source_arn    = aws_cloudwatch_event_rule.s3_upload.arn
30}

Lambda

Простая лямбда, которая слушает eventbridge видит евент от s3 что создан файлик, вытаскивает из евента имя файла и запускает Glue workflow

 1data "archive_file" "lambda_zip" {
 2  type        = "zip"
 3  source_file = "lambda/lambda_handler.py"
 4  output_path = "lambda/lambda_handler.zip"
 5}
 6
 7resource "aws_lambda_function" "workflow_starter" {
 8  function_name    = "glue-workflow-start"
 9  filename         = data.archive_file.lambda_zip.output_path
10  source_code_hash = data.archive_file.lambda_zip.output_base64sha256
11  handler          = "lambda_handler.lambda_handler"
12  runtime          = "python3.14"
13  role             = aws_iam_role.lambda_glue_invoker_role.arn
14  timeout          = 60
15  environment {
16      variables = {
17        GLUE_WORKFLOW_NAME = aws_glue_workflow.convert_workflow.name
18      }
19    }
20}
21
22resource "aws_iam_role" "lambda_glue_invoker_role" {
23  name = "lambda-glue-invoker"
24
25  assume_role_policy = jsonencode({
26    Version = "2012-10-17"
27    Statement = [{
28      Action    = "sts:AssumeRole"
29      Effect    = "Allow"
30      Principal = { Service = "lambda.amazonaws.com" }
31    }]
32  })
33}
34
35resource "aws_iam_policy" "lambda_glue_policy" {
36  name = "lambda-glue-invoker"
37
38  policy = jsonencode({
39    Version = "2012-10-17"
40    Statement = [
41      {
42        Action = [
43          "logs:CreateLogGroup",
44          "logs:CreateLogStream",
45          "logs:PutLogEvents"
46        ]
47        Effect   = "Allow"
48        Resource = "arn:aws:logs:*:*:*"
49      },
50      {
51        Effect   = "Allow"
52        Action   = "glue:StartWorkflowRun"
53        Resource = aws_glue_workflow.convert_workflow.arn
54      }
55    ]
56  })
57}
58
59resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
60  role       = aws_iam_role.lambda_glue_invoker_role.name
61  policy_arn = aws_iam_policy.lambda_glue_policy.arn
62}

Самая лямбда функция. Код писала ИИшка, прошу сильно не бить, свою задачу выполняет, меня устраивает

 1import json
 2import os
 3import boto3
 4
 5WORKFLOW_NAME = os.environ.get('GLUE_WORKFLOW_NAME')
 6
 7glue_client = boto3.client('glue')
 8
 9def lambda_handler(event, context):
10    print("Received event: " + json.dumps(event, indent=2))
11
12    if 'detail' not in event or 'bucket' not in event['detail'] or 'object' not in event['detail']:
13        print("Event structure missing S3 details. Exiting.")
14        return {'statusCode': 400, 'body': 'Invalid event structure'}
15
16    s3_bucket = event['detail']['bucket']['name']
17    s3_key = event['detail']['object']['key']
18
19    if not s3_key.lower().endswith('.csv.gpg'):
20        print(f"Skipping file: {s3_key}")
21        return {'statusCode': 200, 'body': 'Skipped'}
22
23    print(f"Starting workflow for s3://{s3_bucket}/{s3_key}")
24
25    try:
26        response = glue_client.start_workflow_run(
27            Name=WORKFLOW_NAME,
28            RunProperties={
29                'input_bucket': s3_bucket,
30                'input_key': s3_key
31            }
32        )
33        print(f"Glue Workflow Run Started: {response['RunId']}")
34
35        return {
36            'statusCode': 200,
37            'body': json.dumps({'message': 'Workflow started', 'RunId': response['RunId']})
38        }
39
40    except Exception as e:
41        print(f"Error starting Glue Workflow: {e}")
42        raise e

Glue

Теперь создаём AWS Glue. Нам требуется создать сам workflow который мы будем запускать, и две джобы которые будут заниматься расшифровкой и конвертированием.

  1resource "aws_glue_workflow" "convert_workflow" {
  2  name = "csv-gpg-convert"
  3}
  4
  5resource "aws_glue_job" "decrypt" {
  6  name         = "decrypt-gpg"
  7  role_arn     = aws_iam_role.glue_decrypt.arn
  8	glue_version = "4.0"
  9
 10  command {
 11    name            = "pythonshell"
 12    python_version  = "3.9"
 13    script_location = "s3://${aws_s3_bucket.scripts.bucket}/decrypt.py"
 14  }
 15
 16  default_arguments = {
 17    "--RAW_BUCKET"                = aws_s3_bucket.raw.bucket
 18    "--DECRYPTED_BUCKET"          = aws_s3_bucket.decrypted.bucket
 19    "--GPG_KEY_SECRET"            = "CSV_GPG_CONVERT/KEY"
 20    "--GPG_PASSPHRASE_SECRET"     = "CSV_GPG_CONVERT/PASSPHRASE"
 21    "--additional-python-modules" = "python-gnupg==0.5.5"
 22  }
 23}
 24
 25resource "aws_glue_job" "convert" {
 26  name         = "csv-to-parquet"
 27  role_arn     = aws_iam_role.glue_convert.arn
 28  glue_version = "4.0"
 29
 30  command {
 31    name            = "glueetl"
 32    script_location = "s3://${aws_s3_bucket.scripts.bucket}/convert.py"
 33    python_version  = "3"
 34  }
 35
 36  default_arguments = {
 37    "--DECRYPTED_BUCKET" = aws_s3_bucket.decrypted.bucket
 38    "--PARQUET_BUCKET"   = aws_s3_bucket.parquet.bucket
 39  }
 40}
 41
 42# Запускаем расшифровку по команде из лямбды
 43resource "aws_glue_trigger" "start_decrypt" {
 44  name          = "trigger-start-decrypt"
 45  type          = "EVENT"
 46  workflow_name = aws_glue_workflow.convert_workflow.name
 47
 48  actions {
 49    job_name = aws_glue_job.decrypt.name
 50  }
 51}
 52
 53# Конвертация запустится, только в том случае если job decrypt перешла в статус SUCCEEDED
 54resource "aws_glue_trigger" "convert" {
 55  name          = "trigger-convert"
 56  type          = "CONDITIONAL"
 57  workflow_name = aws_glue_workflow.convert_workflow.name
 58
 59  predicate {
 60    conditions {
 61      job_name = aws_glue_job.decrypt.name
 62      state    = "SUCCEEDED"
 63    }
 64  }
 65
 66  actions {
 67    job_name = aws_glue_job.convert.name
 68  }
 69}
 70
 71resource "aws_iam_role" "glue_decrypt" {
 72  name = "glue-decrypt-role"
 73
 74  assume_role_policy = jsonencode({
 75    Version = "2012-10-17"
 76    Statement = [{
 77      Effect = "Allow"
 78      Principal = { Service = "glue.amazonaws.com" }
 79      Action = "sts:AssumeRole"
 80    }]
 81  })
 82}
 83
 84resource "aws_iam_role_policy" "glue_decrypt_policy" {
 85  role = aws_iam_role.glue_decrypt.name
 86
 87  policy = jsonencode({
 88    Version = "2012-10-17"
 89    Statement = [
 90      {
 91        Effect   = "Allow"
 92        Action   = ["s3:ListBucket"]
 93        Resource = [
 94          "${aws_s3_bucket.raw.arn}",
 95          "${aws_s3_bucket.decrypted.arn}"
 96        ]
 97      },
 98      {
 99        Effect   = "Allow"
100        Action   = ["s3:GetObject", "s3:PutObject"]
101        Resource = [
102          "${aws_s3_bucket.raw.arn}/*",
103          "${aws_s3_bucket.decrypted.arn}/*",
104          "${aws_s3_bucket.scripts.arn}/*"
105        ]
106      },
107      {
108        Effect   = "Allow"
109        Action   = ["secretsmanager:GetSecretValue"]
110        Resource = "arn:aws:secretsmanager:${var.region}:${data.aws_caller_identity.current.account_id}:secret:CSV_GPG_CONVERT*"
111      },
112      {
113        Effect   = "Allow"
114        Action   = ["glue:GetWorkflowRunProperties"]
115        Resource = [aws_glue_workflow.convert_workflow.arn]
116      },
117      {
118        Action = [
119          "logs:CreateLogGroup",
120          "logs:CreateLogStream",
121          "logs:PutLogEvents"
122        ]
123        Effect   = "Allow"
124        Resource = "arn:aws:logs:*:*:*"
125      }
126    ]
127  })
128}
129
130resource "aws_iam_role" "glue_convert" {
131  name = "glue-convert-role"
132
133  assume_role_policy = jsonencode({
134    Version = "2012-10-17"
135    Statement = [{
136      Effect = "Allow"
137      Principal = { Service = "glue.amazonaws.com" }
138      Action = "sts:AssumeRole"
139    }]
140  })
141}
142
143resource "aws_iam_role_policy" "glue_convert_policy" {
144  name = "glue-convert-policy"
145  role = aws_iam_role.glue_convert.name
146
147  policy = jsonencode({
148    Version = "2012-10-17"
149    Statement = [
150      {
151        Effect   = "Allow"
152        Action   = ["s3:ListBucket"]
153        Resource = [
154            "${aws_s3_bucket.decrypted.arn}",
155            "${aws_s3_bucket.parquet.arn}",
156            "${aws_s3_bucket.scripts.arn}"
157          ]
158      },
159      {
160        Effect   = "Allow"
161        Action   = ["s3:GetObject"]
162        Resource = [
163            "${aws_s3_bucket.decrypted.arn}/*",
164            "${aws_s3_bucket.scripts.arn}/*"
165          ]
166      },
167      {
168        Effect   = "Allow"
169        Action   = ["s3:PutObject"]
170        Resource = ["${aws_s3_bucket.parquet.arn}/*"]
171      },
172      {
173        Action = [
174          "logs:CreateLogGroup",
175          "logs:CreateLogStream",
176          "logs:PutLogEvents"
177        ]
178        Effect   = "Allow"
179        Resource = "arn:aws:logs:*:*:*"
180      },
181      {
182        Effect   = "Allow"
183        Action   = ["glue:GetWorkflowRunProperties"]
184        Resource = [aws_glue_workflow.convert_workflow.arn]
185      }
186    ]
187  })
188}

Так же прикладываю файлы decrypt.py и convert.py которые делают основную работу. Код так же писала ИИшка, прошу сильно не бить

convert.py

 1import sys
 2import json
 3import boto3
 4from awsglue.utils import getResolvedOptions
 5from pyspark.context import SparkContext
 6from awsglue.context import GlueContext
 7from awsglue.job import Job
 8
 9args = getResolvedOptions(
10    sys.argv,
11    [
12        "JOB_NAME",
13        "DECRYPTED_BUCKET",
14        "PARQUET_BUCKET",
15        "WORKFLOW_NAME",
16        "WORKFLOW_RUN_ID"
17    ]
18)
19
20workflow_name = args["WORKFLOW_NAME"]
21workflow_run_id = args["WORKFLOW_RUN_ID"]
22decrypted_bucket = args["DECRYPTED_BUCKET"]
23parquet_bucket = args["PARQUET_BUCKET"]
24
25sc = SparkContext()
26glueContext = GlueContext(sc)
27spark = glueContext.spark_session
28
29job = Job(glueContext)
30job.init(args["JOB_NAME"], args)
31
32glue_client = boto3.client("glue")
33
34try:
35    resp = glue_client.get_workflow_run_properties(
36        Name=workflow_name,
37        RunId=workflow_run_id
38    )
39    run_props = resp.get("RunProperties", {})
40    print("Workflow Run Properties:")
41    print(json.dumps(run_props, indent=2))
42
43except Exception as e:
44    print(f"[FATAL] Failed to read Workflow Run Properties: {e}")
45    raise
46
47input_key = run_props.get("input_key")
48
49if not input_key:
50    raise Exception(
51        "[CRITICAL] Missing 'input_key' in Workflow Run Properties. "
52        "Check EventBridge input_template formatting."
53    )
54
55print(f"Input file (encrypted source): {input_key}")
56
57
58csv_key = input_key.replace(".gpg", "")
59input_path = f"s3://{decrypted_bucket}/{csv_key}"
60
61parquet_folder = csv_key.replace(".csv", "")
62output_path = f"s3://{parquet_bucket}/{parquet_folder}/"
63
64print(f"Read from:  {input_path}")
65print(f"Write to:   {output_path}")
66
67try:
68    df = (
69        spark.read
70            .option("header", "true")
71            .option("inferSchema", "true")
72            .csv(input_path)
73    )
74
75    df.write.mode("overwrite").parquet(output_path)
76
77    print("\n✔ Conversion completed successfully.\n")
78
79except Exception as e:
80    print(f"[ERROR] Spark conversion error: {e}")
81    raise
82
83
84job.commit()

decrypt.py

  1import boto3
  2import os
  3import sys
  4import shutil
  5import gnupg
  6from awsglue.utils import getResolvedOptions
  7
  8args = getResolvedOptions(sys.argv, [
  9    "RAW_BUCKET",
 10    "DECRYPTED_BUCKET",
 11    "GPG_KEY_SECRET",
 12    "GPG_PASSPHRASE_SECRET",
 13    "WORKFLOW_NAME",
 14    "WORKFLOW_RUN_ID"
 15])
 16
 17RAW_BUCKET = args["RAW_BUCKET"]
 18DECRYPTED_BUCKET = args["DECRYPTED_BUCKET"]
 19SECRET_KEY_NAME = args["GPG_KEY_SECRET"]
 20SECRET_PASSPHRASE_NAME = args["GPG_PASSPHRASE_SECRET"]
 21WORKFLOW_NAME = args['WORKFLOW_NAME']
 22RUN_ID = args['WORKFLOW_RUN_ID']
 23
 24s3 = boto3.client("s3")
 25secrets = boto3.client("secretsmanager")
 26glue_client = boto3.client("glue")
 27
 28GPG_HOME = "/tmp/gpg_home"
 29
 30if os.path.exists(GPG_HOME):
 31    shutil.rmtree(GPG_HOME)
 32
 33os.makedirs(GPG_HOME, exist_ok=True)
 34
 35gpg = gnupg.GPG(gnupghome=GPG_HOME)
 36
 37print("GPG configured at", GPG_HOME)
 38
 39print("Loading GPG key...")
 40private_key_raw = secrets.get_secret_value(SecretId=SECRET_KEY_NAME)["SecretString"]
 41
 42print("Loading passphrase...")
 43passphrase = secrets.get_secret_value(SecretId=SECRET_PASSPHRASE_NAME)["SecretString"]
 44
 45private_key = private_key_raw.replace("\r", "").strip()
 46
 47print("Importing key...")
 48import_result = gpg.import_keys(private_key)
 49
 50if not import_result.count:
 51    raise Exception("Failed to import GPG key: " + str(import_result.stderr))
 52
 53print("Key imported. Fingerprints:", import_result.fingerprints)
 54
 55
 56def decrypt_file(key):
 57    enc_name = os.path.basename(key)
 58    dec_name = enc_name.replace(".gpg", "")
 59
 60    enc_path = f"/tmp/{enc_name}"
 61    dec_path = f"/tmp/{dec_name}"
 62
 63    print(f"Downloading s3://{RAW_BUCKET}/{key}...")
 64    s3.download_file(RAW_BUCKET, key, enc_path)
 65
 66    print(f"Decrypting {enc_name}...")
 67    with open(enc_path, "rb") as f:
 68        result = gpg.decrypt_file(f, passphrase=passphrase, output=dec_path)
 69
 70    if not result.ok:
 71        print("GPG ERROR:", result.stderr)
 72        raise Exception("GPG decrypt failed")
 73
 74    print("Upload decrypted:", dec_name)
 75    s3.upload_file(dec_path, DECRYPTED_BUCKET, dec_name)
 76
 77    os.remove(enc_path)
 78    os.remove(dec_path)
 79
 80def main():
 81    print(f"Fetching properties for Workflow: {WORKFLOW_NAME}, RunId: {RUN_ID}")
 82
 83    try:
 84        run_props = glue_client.get_workflow_run_properties(
 85            Name=WORKFLOW_NAME,
 86            RunId=RUN_ID
 87        )['RunProperties']
 88
 89        print("debug:")
 90        print(glue_client.get_workflow_run_properties(
 91            Name=WORKFLOW_NAME,
 92            RunId=RUN_ID
 93        ))
 94
 95        input_key = run_props.get('input_key')
 96
 97        if not input_key:
 98            print("Warning: No 'input_key' found in RunProperties. Nothing to process.")
 99            return
100
101        print(f"Triggered for file: {input_key}")
102        decrypt_file(input_key)
103
104    except Exception as e:
105        print(f"Error processing workflow properties: {str(e)}")
106        raise e
107
108    print("Cleaning up GPG home...")
109    shutil.rmtree(GPG_HOME, ignore_errors=True)
110
111    print("DONE")
112
113
114if __name__ == "__main__":
115    main()

Вот собственно и всё.