1- import pickle
2- import tempfile
31import logging
2+ import pandas as pd
3+ from sqlalchemy import create_engine
44
5+ from typing import List
56from scystream .sdk .core import entrypoint
67from scystream .sdk .env .settings import (
78 EnvSettings ,
89 InputSettings ,
910 OutputSettings ,
10- FileSettings
11+ FileSettings ,
12+ PostgresSettings
1113)
1214from scystream .sdk .file_handling .s3_manager import S3Operations
1315
1416from preprocessing .core import Preprocessor
1517from preprocessing .loader import TxtLoader , BibLoader
18+ from preprocessing .models import DocumentRecord , PreprocessedDocument
1619
1720logging .basicConfig (
1821 level = logging .INFO ,
2124logger = logging .getLogger (__name__ )
2225
2326
24- class DTMFileOutput (FileSettings , OutputSettings ):
25- __identifier__ = "dtm_output"
26-
27- FILE_EXT : str = "pkl"
28-
29-
30- class VocabFileOutput (FileSettings , OutputSettings ):
31- __identifier__ = "vocab_output"
32-
33- FILE_EXT : str = "pkl"
27+ class NormalizedDocsOutput (PostgresSettings , OutputSettings ):
28+ __identifier__ = "normalized_docs"
3429
3530
3631class TXTFileInput (FileSettings , InputSettings ):
@@ -56,8 +51,7 @@ class PreprocessTXT(EnvSettings):
5651 TXT_DOWNLOAD_PATH : str = "/tmp/input.txt"
5752
5853 txt_input : TXTFileInput
59- dtm_output : DTMFileOutput
60- vocab_output : VocabFileOutput
54+ normalized_docs_output : NormalizedDocsOutput
6155
6256
6357class PreprocessBIB (EnvSettings ):
@@ -71,13 +65,37 @@ class PreprocessBIB(EnvSettings):
7165 BIB_DOWNLOAD_PATH : str = "/tmp/input.bib"
7266
7367 bib_input : BIBFileInput
74- dtm_output : DTMFileOutput
75- vocab_output : VocabFileOutput
68+ normalized_docs_output : NormalizedDocsOutput
69+
70+
71+ def _write_preprocessed_docs_to_postgres (
72+ preprocessed_ouput : List [PreprocessedDocument ],
73+ settings : PostgresSettings
74+ ):
75+ df = pd .DataFrame ([
76+ {
77+ "doc_id" : d .doc_id ,
78+ "tokens" : d .tokens
79+ }
80+ for d in preprocessed_ouput
81+ ])
82+
83+ logger .info (f"Writing { len (df )} processed documents to DB table '{
84+ settings .DB_TABLE } '…" )
85+ engine = create_engine (
86+ f"postgresql+psycopg2://{ settings .PG_USER } :{ settings .PG_PASS } "
87+ f"@{ settings .PG_HOST } :{ int (settings .PG_PORT )} /"
88+ )
89+
90+ df .to_sql (settings .DB_TABLE , engine , if_exists = "replace" , index = False )
91+
92+ logger .info (f"Successfully stored normalized documents into '{
93+ settings .DB_TABLE } '." )
7694
7795
78- def _preprocess_and_store (texts , settings ):
96+ def _preprocess_and_store (documents : List [ DocumentRecord ] , settings ):
7997 """Shared preprocessing logic for TXT and BIB."""
80- logger .info (f"Starting preprocessing with { len (texts )} documents" )
98+ logger .info (f"Starting preprocessing with { len (documents )} documents" )
8199
82100 pre = Preprocessor (
83101 language = settings .LANGUAGE ,
@@ -88,27 +106,11 @@ def _preprocess_and_store(texts, settings):
88106 ngram_max = settings .NGRAM_MAX ,
89107 )
90108
91- pre .texts = texts
92- pre .analyze_texts ()
93-
94- pre .generate_bag_of_words ()
95-
96- dtm , vocab = pre .generate_document_term_matrix ()
97-
98- with tempfile .NamedTemporaryFile (suffix = "_dtm.pkl" ) as tmp_dtm , \
99- tempfile .NamedTemporaryFile (suffix = "_vocab.pkl" ) as tmp_vocab :
100-
101- pickle .dump (dtm , tmp_dtm )
102- tmp_dtm .flush ()
103-
104- pickle .dump (vocab , tmp_vocab )
105- tmp_vocab .flush ()
106-
107- logger .info ("Uploading DTM to S3..." )
108- S3Operations .upload (settings .dtm_output , tmp_dtm .name )
109+ pre .documents = documents
110+ result = pre .generate_normalized_output ()
109111
110- logger . info ( "Uploading vocabulary to S3..." )
111- S3Operations . upload ( settings . vocab_output , tmp_vocab . name )
112+ _write_preprocessed_docs_to_postgres (
113+ result , settings . normalized_docs_output )
112114
113115 logger .info ("Preprocessing completed successfully." )
114116
0 commit comments