-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcore_processing.py
More file actions
240 lines (200 loc) · 9.68 KB
/
core_processing.py
File metadata and controls
240 lines (200 loc) · 9.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import tempfile
import os
import hashlib
import logging
import gzip
import magic # For python-magic
from typing import List, Tuple
# Langchain and Unstructured related imports
from langchain_core.documents import Document
from langchain_unstructured import UnstructuredLoader
from unstructured.cleaners.core import clean_extra_whitespace
from langchain.text_splitter import RecursiveCharacterTextSplitter
from pydantic import BaseModel # To be used as a type hint for Langchain Document
# Project-specific imports
from config import settings
from models import DocumentItem
logger = logging.getLogger(__name__)
class DocumentProcessingError(Exception):
"""Custom exception for errors during document loading or processing."""
pass
def is_gz_file(file_path: str) -> bool:
"""
Checks if a file is GZip compressed by inspecting its first two bytes.
Args:
file_path (str): The path to the file to check.
Returns:
bool: True if the file is GZip compressed, False otherwise.
"""
with open(file_path, 'rb') as f:
return f.read(2) == b'\x1f\x8b'
def get_mime_type(file_path: str) -> str:
"""
Determines the MIME type of a file using the python-magic library.
Args:
file_path (str): The path to the file.
Returns:
str: The detected MIME type of the file.
"""
mime_type = magic.from_file(file_path, mime=True)
return mime_type
def load_by_unstructured(file_path: str) -> List[Document]:
"""
Loads a document using UnstructuredLoader with specific post-processors
and a basic chunking strategy.
This function configures the UnstructuredLoader to clean extra whitespace
and uses a "basic" chunking strategy with a very large character limit
to effectively load the document as a whole before further splitting.
Args:
file_path (str): The path to the file to load.
Returns:
List[BaseModel]: A list of Langchain Document objects.
Raises:
DocumentProcessingError: If UnstructuredLoader fails to process the document.
"""
try:
loader = UnstructuredLoader(
file_path=file_path,
post_processors=[clean_extra_whitespace],
chunking_strategy="basic", # This aims to load the whole doc content
max_characters=10000000, # Very large to avoid chunking by Unstructured itself
include_orig_elements=False,
)
return loader.load()
except Exception as e:
logger.error(f"UnstructuredLoader failed for path {file_path}: {e}", exc_info=True)
raise DocumentProcessingError(f"Failed to process document with UnstructuredLoader: {str(e)}")
def load(uploaded_file_path: str, settings_obj) -> Tuple[List[Document], str]:
"""
Loads a document from a temporary file, handling GZip decompression if necessary,
and processes it using UnstructuredLoader.
Args:
uploaded_file_path (str): The path to the (potentially gzipped) uploaded file.
settings_obj: The application settings object, used to access delete_temp_file.
Returns:
Tuple[List[BaseModel], str]: A tuple where the first element is a list of
Langchain Document objects, and the second element is the detected
MIME type of the processed file content.
Raises:
DocumentProcessingError: If GZip decompression or document processing fails.
"""
if is_gz_file(uploaded_file_path):
logger.info(f'File {uploaded_file_path} is gzip compressed. Decompressing...')
decompressed_file_path = None
try:
# Create a new temporary file for the decompressed content
with tempfile.NamedTemporaryFile(
mode='wb', delete=False, suffix=".gz_decompressed") as decompressed_file_obj:
decompressed_file_path = decompressed_file_obj.name
with gzip.open(uploaded_file_path, 'rb') as gzipped_file:
while True:
chunk = gzipped_file.read(1024 * 1024) # Read in 1MB chunks
if not chunk:
break
decompressed_file_obj.write(chunk)
# Now load from the decompressed file path
docs = load_by_unstructured(decompressed_file_path)
mime_type = get_mime_type(decompressed_file_path)
return docs, mime_type
except gzip.BadGzipFile as e:
logger.error(f"Gzip decompression failed for {uploaded_file_path}: {e}", exc_info=True)
raise DocumentProcessingError(f"Failed to decompress GZip file: {str(e)}")
except Exception as e: # Catch other errors during decompressed load
logger.error(f"Error processing decompressed file from {uploaded_file_path} (temp path {decompressed_file_path}): {e}", exc_info=True)
raise DocumentProcessingError(f"Failed to process decompressed file: {str(e)}")
finally:
if decompressed_file_path and settings_obj.delete_temp_file and os.path.exists(decompressed_file_path):
os.remove(decompressed_file_path)
else:
mime_type = get_mime_type(uploaded_file_path)
# Pass the original path directly to load_by_unstructured
docs = load_by_unstructured(uploaded_file_path)
return docs, mime_type
def get_doc_id(doc: Document) -> str:
"""
Generates a unique 12-character ID for a document based on an MD5 hash
of its 'source' metadata.
Args:
doc (BaseModel): A Langchain Document object which must have a 'metadata'
attribute containing a 'source' key.
Returns:
str: A 12-character unique ID for the document.
"""
current_md5 = hashlib.md5()
# Ensure metadata and source exist and are strings
source_material = ""
if hasattr(doc, 'metadata') and isinstance(doc.metadata, dict) and 'source' in doc.metadata:
source_material = str(doc.metadata['source'])
current_md5.update(source_material.encode('utf-8'))
uid = current_md5.hexdigest()[:12]
return uid
def split(doc: Document, q_chunk_size: int, q_chunk_overlap: int) -> List[DocumentItem]:
"""
Splits a Langchain Document into smaller DocumentItem chunks using
RecursiveCharacterTextSplitter.
Args:
doc (BaseModel): The Langchain Document object to be split. Expected to have
`page_content` (str) and `metadata` (dict) attributes.
q_chunk_size (int): The target maximum size for each chunk in characters.
q_chunk_overlap (int): The number of characters to overlap between
consecutive chunks.
Returns:
List[DocumentItem]: A list of DocumentItem Pydantic models.
"""
items = []
# Ensure page_content exists and is a string
page_content = getattr(doc, 'page_content', '')
if not isinstance(page_content, str):
page_content = ""
if len(page_content) > q_chunk_size:
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=q_chunk_size,
chunk_overlap=q_chunk_overlap,
length_function=len,
add_start_index=True, # Useful for some applications
)
texts = text_splitter.split_text(page_content)
logger.debug(f"Number of chunks created: {len(texts)}")
if len(texts) > 1:
logger.debug(f"Sample chunk [1] (first 100 chars): {texts[1][:100]}...")
doc_metadata = getattr(doc, 'metadata', {})
if not isinstance(doc_metadata, dict): # Ensure metadata is a dict
doc_metadata = {}
# The original 'id' generation based on document source was problematic for chunks.
# Each chunk should ideally get its own ID or retain source ID + chunk sequence.
# For now, we'll use the source ID for all chunks from that source, plus add chunk index.
# A more robust solution might involve hashing chunk content for a unique chunk ID.
source_doc_id = get_doc_id(doc) # Get ID from document source
for i, text_chunk in enumerate(texts):
chunk_metadata = doc_metadata.copy() # Start with original doc metadata
chunk_metadata['id'] = f"{source_doc_id}_chunk_{i}" # Append chunk index to source ID
chunk_metadata['chunk_index'] = i
# Potentially add start_index from splitter if `add_start_index=True` was used
# and the splitter provides this information in a structured way.
# text_splitter.split_documents might be better if metadata per chunk is needed.
items.append(
DocumentItem(
content=text_chunk,
metadata=chunk_metadata,
)
)
elif page_content: # If content exists but isn't larger than chunk_size
doc_metadata = getattr(doc, 'metadata', {})
if not isinstance(doc_metadata, dict):
doc_metadata = {}
doc_metadata['id'] = f"{get_doc_id(doc)}_chunk_0"
doc_metadata['chunk_index'] = 0
items.append(
DocumentItem(
content=page_content,
metadata=doc_metadata,
)
)
else: # Content is not larger than chunk_size
if page_content: # If there's content, but it's not split
logger.debug(f"Content length {len(page_content)} <= chunk_size {q_chunk_size}. Full content not split, creating one item.")
# This part was already correctly creating one item if page_content was not empty and not split.
else: # page_content is empty
logger.debug("Empty page_content, no items created from splitting.")
# If page_content is empty, items list will be empty.
return items