Skip to content

Commit 1fae1c9

Browse files
authored
Jobs, buckets and sagemaker infrastructure (#49)
* jobs, buckets and sagemaker infrastructure * Anomaly detection infrastructure * script change in infrastructure * deafult quantile change
1 parent 85ae65c commit 1fae1c9

14 files changed

Lines changed: 488 additions & 29 deletions

File tree

infrastructure/main.tf

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ module "s3_buckets" {
33
}
44

55
module "iam" {
6-
source = "./modules/iam"
7-
glue_role_name = var.glue_role_name
6+
source = "./modules/iam"
7+
glue_role_name = var.glue_role_name
8+
sagemaker_execution_role_name = var.sagemaker_execution_role_name
89
}
910

1011
module "glue_catalog" {
@@ -14,22 +15,23 @@ module "glue_catalog" {
1415
bdp_wallets_aggregations_bucket = module.s3_buckets.bdp_wallets_aggregations_bucket
1516
bdp_scaled_features_bucket = module.s3_buckets.bdp_scaled_features_bucket
1617
bdp_unscaled_features_bucket = module.s3_buckets.bdp_unscaled_features_bucket
18+
bdp_anomaly_detection_bucket = module.s3_buckets.bdp_anomaly_detection_bucket
1719
}
1820

1921
module "iam_github_role" {
2022
source = "./modules/iam_github_role"
2123
github_role_name = var.github_role_name
22-
glue_script_bucket = module.s3_buckets.glue_scripts_bucket
24+
glue_script_bucket = module.s3_buckets.bdp_glue_scripts_bucket
2325
}
2426

2527
module "iam_github_user" {
2628
source = "./modules/iam_github_user"
27-
glue_script_bucket = module.s3_buckets.glue_scripts_bucket
29+
glue_script_bucket = module.s3_buckets.bdp_glue_scripts_bucket
2830
}
2931

3032
module "glue_jobs" {
3133
source = "./modules/glue_jobs"
32-
glue_script_bucket = module.s3_buckets.glue_scripts_bucket
34+
glue_script_bucket = module.s3_buckets.bdp_glue_scripts_bucket
3335
glue_role_arn = module.iam.glue_role_arn
3436
default_arguments = var.glue_jobs_default_arguments
3537
}
@@ -40,4 +42,9 @@ module "glue_workflows" {
4042
transactions_cleaning_job_name = module.glue_jobs.transactions_cleaning_job_name
4143
wallets_aggregations_job_name = module.glue_jobs.wallets_aggregations_job_name
4244
feature_scaling_job_name = module.glue_jobs.feature_scaling_job_name
45+
}
46+
47+
module "sagemaker_notebooks" {
48+
source = "./modules/sagemaker_notebooks"
49+
sagemaker_execution_role_arn = module.iam.sagemaker_execution_role_arn
4350
}

infrastructure/modules/glue_catalog/main.tf

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ resource "aws_glue_catalog_database" "bdp_db" {
1717
name = "network_name"
1818
type = "string"
1919
}
20+
partition_keys {
21+
name = "day(block_timestamp)"
22+
type = "timestamp"
23+
}
2024
2125
parameters = {
2226
"write.format.default" = "parquet"
@@ -137,6 +141,10 @@ resource "aws_glue_catalog_table_optimizer" "cleaned_transactions_compaction_opt
137141
name = "network_name"
138142
type = "string"
139143
}
144+
partition_keys {
145+
name = "day(last_transaction_timestamp)"
146+
type = "timestamp"
147+
}
140148
141149
parameters = {
142150
"write.format.default" = "parquet",
@@ -1014,6 +1022,104 @@ resource "aws_glue_catalog_table_optimizer" "unscaled_features_compaction_optimi
10141022
table_name = "unscaled_features"
10151023
type = "compaction"
10161024

1025+
configuration {
1026+
role_arn = var.glue_role_arn
1027+
enabled = true
1028+
}
1029+
}
1030+
1031+
/*resource "aws_glue_catalog_table" "anomaly_detection" {
1032+
database_name = aws_glue_catalog_database.bdp_db.name
1033+
name = "anomaly_detection"
1034+
table_type = "EXTERNAL_TABLE"
1035+
1036+
open_table_format_input {
1037+
iceberg_input {
1038+
metadata_operation = "CREATE"
1039+
}
1040+
}
1041+
//Commented because https://github.com/hashicorp/terraform-provider-aws/issues/36531
1042+
partition_keys {
1043+
name = "network_name"
1044+
type = "string"
1045+
}
1046+
1047+
partition_keys {
1048+
name = "day(block_timestamp_unscaled)"
1049+
type = "timestamp"
1050+
}
1051+
1052+
parameters = {
1053+
"write.format.default" = "parquet"
1054+
"write.parquet.compression-codec" = "zstd"
1055+
}
1056+
1057+
storage_descriptor {
1058+
location = "s3://${var.bdp_anomaly_detection_bucket}"
1059+
input_format = "org.apache.hadoop.mapred.FileInputFormat"
1060+
output_format = "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"
1061+
compressed = true
1062+
1063+
ser_de_info {
1064+
name = "anomaly_detection_serde"
1065+
serialization_library = "org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"
1066+
}
1067+
1068+
1069+
columns {
1070+
name = "transaction_hash"
1071+
type = "string"
1072+
}
1073+
columns {
1074+
name = "sender_address"
1075+
type = "string"
1076+
}
1077+
columns {
1078+
name = "receiver_address"
1079+
type = "string"
1080+
}
1081+
columns {
1082+
name = "block_timestamp_unscaled"
1083+
type = "timestamp"
1084+
}
1085+
columns {
1086+
name = "network_name"
1087+
type = "string"
1088+
}
1089+
columns {
1090+
name = "is_anomaly"
1091+
type = "boolean"
1092+
}
1093+
1094+
}
1095+
}
1096+
}*/
1097+
1098+
resource "aws_glue_catalog_table_optimizer" "anomaly_detection_orphan_files_deletion_optimizer" {
1099+
catalog_id = "982534349340"
1100+
database_name = aws_glue_catalog_database.bdp_db.name
1101+
table_name = "anomaly_detection"
1102+
type = "orphan_file_deletion"
1103+
1104+
configuration {
1105+
role_arn = var.glue_role_arn
1106+
enabled = true
1107+
1108+
orphan_file_deletion_configuration {
1109+
iceberg_configuration {
1110+
orphan_file_retention_period_in_days = 2
1111+
location = "s3://${var.bdp_anomaly_detection_bucket}"
1112+
}
1113+
}
1114+
}
1115+
}
1116+
1117+
resource "aws_glue_catalog_table_optimizer" "anomaly_detection_compaction_optimizer" {
1118+
catalog_id = "982534349340"
1119+
database_name = aws_glue_catalog_database.bdp_db.name
1120+
table_name = "anomaly_detection"
1121+
type = "compaction"
1122+
10171123
configuration {
10181124
role_arn = var.glue_role_arn
10191125
enabled = true

infrastructure/modules/glue_catalog/variables.tf

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ variable "bdp_unscaled_features_bucket" {
1818
description = "Unscaled features bucket name"
1919
}
2020

21+
variable "bdp_anomaly_detection_bucket" {
22+
type = string
23+
description = "Anomaly detection bucket name"
24+
}
25+
2126
variable "glue_role_arn" {
2227
type = string
2328
description = "ARN of IAM role for Glue"

infrastructure/modules/glue_jobs/main.tf

Lines changed: 116 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,24 @@
11
locals {
22
transactions_cleaning_arguments = {
3-
"--END_DATE" = "2024-11-30"
4-
"--START_DATE" = "2024-11-1"
3+
"--END_DATE" = "2024-12-31"
4+
"--START_DATE" = "2024-10-1"
55
"--NETWORK_PREFIX" = "all"
66
}
77

8-
transactions_cleaning_final_arguments = merge(
9-
var.default_arguments,
10-
local.transactions_cleaning_arguments
11-
)
8+
iceberg_argument = {
9+
"--datalake-formats" = "iceberg"
10+
}
11+
12+
converting_to_recordio_arguments = {
13+
"--extra-jars" = "s3://bdp-glue-scripts/sagemaker-spark_2.12-spark_3.3.0-1.4.6.dev0.jar"
14+
"--python-modules-installer-option" = "-r"
15+
"--additional-python-modules" = "s3://bdp-glue-scripts/requirements.txt"
16+
}
17+
18+
anomaly_classification_arguments = {
19+
"--QUANTILE" = 0.99
20+
}
21+
1222
}
1323

1424
resource "aws_glue_job" "transactions_cleaning" {
@@ -20,10 +30,10 @@ resource "aws_glue_job" "transactions_cleaning" {
2030
python_version = "3"
2131
}
2232

23-
worker_type = "G.1X"
33+
worker_type = "G.2X"
2434
number_of_workers = 10
2535
glue_version = "5.0"
26-
default_arguments = local.transactions_cleaning_final_arguments
36+
default_arguments = merge(var.default_arguments, local.iceberg_argument, local.transactions_cleaning_arguments)
2737
timeout = 120
2838
}
2939

@@ -37,10 +47,10 @@ resource "aws_glue_job" "wallets_aggregations" {
3747
python_version = "3"
3848
}
3949

40-
worker_type = "G.1X"
50+
worker_type = "G.2X"
4151
number_of_workers = 10
4252
glue_version = "5.0"
43-
default_arguments = var.default_arguments
53+
default_arguments = merge(var.default_arguments, local.iceberg_argument)
4454
timeout = 120
4555
}
4656

@@ -53,9 +63,105 @@ resource "aws_glue_job" "feature_scaling" {
5363
python_version = "3"
5464
}
5565

66+
worker_type = "G.2X"
67+
number_of_workers = 10
68+
glue_version = "5.0"
69+
default_arguments = merge(var.default_arguments, local.iceberg_argument)
70+
timeout = 120
71+
}
72+
73+
resource "aws_glue_job" "spearman_feature_selection" {
74+
name = "Spearman feature selection"
75+
role_arn = var.glue_role_arn
76+
command {
77+
name = "glueetl"
78+
script_location = "s3://${var.glue_script_bucket}/spearman.py"
79+
python_version = "3"
80+
}
81+
82+
worker_type = "G.2X"
83+
number_of_workers = 10
84+
glue_version = "5.0"
85+
default_arguments = var.default_arguments
86+
timeout = 300
87+
}
88+
89+
resource "aws_glue_job" "convert_parquet_to_csv" {
90+
name = "Convert parquet to CSV"
91+
role_arn = var.glue_role_arn
92+
command {
93+
name = "glueetl"
94+
script_location = "s3://${var.glue_script_bucket}/convert_features_to_csv.py"
95+
python_version = "3"
96+
}
97+
98+
worker_type = "G.1X"
99+
number_of_workers = 10
100+
glue_version = "5.0"
101+
default_arguments = var.default_arguments
102+
timeout = 480
103+
}
104+
105+
resource "aws_glue_job" "convert_features_to_recordio" {
106+
name = "Convert features to recordio"
107+
role_arn = var.glue_role_arn
108+
command {
109+
name = "glueetl"
110+
script_location = "s3://${var.glue_script_bucket}/convert_features_to_recordio.py"
111+
python_version = "3"
112+
}
113+
114+
worker_type = "G.2X"
115+
number_of_workers = 10
116+
glue_version = "5.0"
117+
default_arguments = merge(var.default_arguments, local.converting_to_recordio_arguments)
118+
timeout = 180
119+
}
120+
121+
resource "aws_glue_job" "preprocessing_with_string_columns" {
122+
name = "Preprocesssing with string columns"
123+
role_arn = var.glue_role_arn
124+
command {
125+
name = "glueetl"
126+
script_location = "s3://${var.glue_script_bucket}/preprocessing_for_inference.py"
127+
python_version = "3"
128+
}
129+
56130
worker_type = "G.1X"
57131
number_of_workers = 10
58132
glue_version = "5.0"
59133
default_arguments = var.default_arguments
60134
timeout = 120
135+
}
136+
137+
resource "aws_glue_job" "convert_parquet_to_csv_for_visualisation" {
138+
name = "Convert parquet to csv for visualization"
139+
role_arn = var.glue_role_arn
140+
command {
141+
name = "glueetl"
142+
script_location = "s3://${var.glue_script_bucket}/convert_features_to_csv_inference.py"
143+
python_version = "3"
144+
}
145+
146+
worker_type = "G.1X"
147+
number_of_workers = 10
148+
glue_version = "5.0"
149+
default_arguments = var.default_arguments
150+
timeout = 120
151+
}
152+
153+
resource "aws_glue_job" "anomaly_classification" {
154+
name = "Anomaly Classification"
155+
role_arn = var.glue_role_arn
156+
command {
157+
name = "glueetl"
158+
script_location = "s3://${var.glue_script_bucket}/detect_anomaly.py"
159+
python_version = "3"
160+
}
161+
162+
worker_type = "G.1X"
163+
number_of_workers = 10
164+
glue_version = "5.0"
165+
default_arguments = merge(var.default_arguments, local.anomaly_classification_arguments)
166+
timeout = 120
61167
}

infrastructure/modules/glue_jobs/outputs.tf

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,29 @@ output "wallets_aggregations_job_name" {
1111
output "feature_scaling_job_name" {
1212
value = aws_glue_job.feature_scaling.name
1313
description = "Name of the feature_scaling Glue job"
14+
}
15+
16+
output "spearman_feature_selection_job_name" {
17+
value = aws_glue_job.spearman_feature_selection.name
18+
description = "Name of the spearman_feature_selection Glue job"
19+
}
20+
21+
output "convert_parquet_to_csv_job_name" {
22+
value = aws_glue_job.convert_parquet_to_csv.name
23+
description = "Name of the convert_parquet_to_csv Glue job"
24+
}
25+
26+
output "convert_features_to_recordio_job_name" {
27+
value = aws_glue_job.convert_features_to_recordio.name
28+
description = "Name of the convert_features_to_recordio Glue job"
29+
}
30+
31+
output "preprocessing_with_string_columns_job_name" {
32+
value = aws_glue_job.preprocessing_with_string_columns.name
33+
description = "Name of the preprocessing_with_string_columns Glue job"
34+
}
35+
36+
output "convert_parquet_to_csv_for_visualisation_job_name" {
37+
value = aws_glue_job.convert_parquet_to_csv_for_visualisation.name
38+
description = "Name of the convert_parquet_to_csv_for_visualisation Glue job"
1439
}

0 commit comments

Comments
 (0)