Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
5dad714
feat: add kubernetes app role selection
kyteinsky Mar 5, 2026
089d27a
feat: add thread start and stop logic
kyteinsky Mar 5, 2026
64ffdaf
wip: migrate the indexing process
kyteinsky Mar 9, 2026
03a3f43
wip: parallelize file parsing and processing based on cpu count
kyteinsky Mar 9, 2026
0dc404b
ci: use the kubernetes branch of context_chat
kyteinsky Mar 10, 2026
c733982
fix typo
kyteinsky Mar 10, 2026
dda312f
migrate the update process to be thread based
kyteinsky Mar 11, 2026
b09a93c
fix pydantic types
kyteinsky Mar 11, 2026
11b436c
fix: use a dedicated event to allow app halt without app being disabled
kyteinsky Mar 11, 2026
c88e153
fix fetch url and pydantic types
kyteinsky Mar 11, 2026
cd5241e
fix: use the correct file id
kyteinsky Mar 11, 2026
4958d1d
fix: wip: improve embeddings exception handling
kyteinsky Mar 11, 2026
a049121
fix(ci): update to the latest changes
kyteinsky Mar 11, 2026
795380c
fix(ci): use file to store stderr
kyteinsky Mar 12, 2026
7bc0ed7
fix(ci): add cron jobs
kyteinsky Mar 12, 2026
d94c687
fix(ci): do a occ files scan before cron jobs
kyteinsky Mar 12, 2026
dadc8fa
feat: record indexing errors in content decode function
kyteinsky Mar 16, 2026
f9d86dc
chore: move file fetch inside injest
kyteinsky Mar 17, 2026
1ade191
fix: truly parallel file parsing and indexing
kyteinsky Mar 18, 2026
12fd1ca
initial pass at request processing
marcelklehr Mar 24, 2026
8aa2471
implement request processing
marcelklehr Mar 25, 2026
2093936
request processing fixes
kyteinsky Mar 26, 2026
36b5f02
chore: drop commented code
kyteinsky Mar 26, 2026
85d29f1
fix(ci): parse json output from the stats command
kyteinsky Mar 26, 2026
4c6d01b
fix: seek to 0 to read the full buffer
kyteinsky Mar 26, 2026
51774ff
fix(ci): 3% tolerance
kyteinsky Mar 26, 2026
c81b675
fix(ci): wait longer for EM server
kyteinsky Mar 26, 2026
6817f89
fix: don't process files or requests until the EM server is healthy
kyteinsky Mar 30, 2026
104a37a
tests: Increase testing time to allow backend to injest more sources
marcelklehr Apr 1, 2026
b3b461a
fix: More log statements
marcelklehr Apr 1, 2026
a4a88da
tests: Set wait time back to 90
marcelklehr Apr 1, 2026
0c52747
fix: Reduce worker count on github actions
marcelklehr Apr 1, 2026
e676c32
fix(exec_in_proc): Raise RuntimeError if exitcode is non-zero
marcelklehr Apr 1, 2026
b027ff3
fix(indexing): Reduce memory pressure on gh actions
marcelklehr Apr 1, 2026
19b773f
fix(indexing): Fallback to batch_size=1 if embed_sources is killed
marcelklehr Apr 1, 2026
bde0bc5
fix: log stdout and stderr from subprocesses
kyteinsky Apr 2, 2026
4de591f
fix: don't raise before std* is captured
kyteinsky Apr 2, 2026
4deda84
feat: log cpu count and memory info of the system
kyteinsky Apr 2, 2026
ad0eac7
fix: catch BaseException in subprocess
kyteinsky Apr 2, 2026
36bcfb7
fix(utils): Improve exec_in_proc to handle more failure modes
marcelklehr Apr 2, 2026
47eaf72
one more stab at a fix
kyteinsky Apr 3, 2026
309ab2b
do not throw away the valid result even with exitcode 1
kyteinsky Apr 3, 2026
e1763ac
fix: use forkserver as process start method
kyteinsky Apr 3, 2026
3301652
fix(ci): consider eligible files as the total files count
kyteinsky Apr 3, 2026
32aa374
fix: use logging config in forkserver and other fixes
kyteinsky Apr 3, 2026
33ee38a
fix: remove extra diagnostics
kyteinsky Apr 3, 2026
d9ebdac
fix: use zip on the subset of filtered sources
kyteinsky Apr 3, 2026
ea77480
fix(em): use tcp socket connection check
kyteinsky Apr 3, 2026
1ce237a
fix(ci): remove github CI restrictions
kyteinsky Apr 3, 2026
d82e01b
fix: remove unused code and some de-duplication
kyteinsky Apr 3, 2026
286db22
fix(mp): run repairs and config file check only in MainProcess
kyteinsky Apr 3, 2026
726eb64
fix: attach source_ids as keys in json logs
kyteinsky Apr 7, 2026
073f9d0
fix(ci): upload db dump artifacts
kyteinsky Apr 7, 2026
13ea740
fix: retry PGVector object creation if table already exists
kyteinsky Apr 7, 2026
dcb04e7
fix: unique db dump artifact id
kyteinsky Apr 7, 2026
dc1d57b
fix(ci): log stats before exit
kyteinsky Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 99 additions & 26 deletions .github/workflows/integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
POSTGRES_USER: root
POSTGRES_PASSWORD: rootpassword
POSTGRES_DB: nextcloud
options: --health-cmd pg_isready --health-interval 5s --health-timeout 2s --health-retries 5
options: --health-cmd pg_isready --health-interval 5s --health-timeout 2s --health-retries 5 --name postgres --hostname postgres

steps:
- name: Checkout server
Expand All @@ -113,6 +113,8 @@ jobs:
repository: nextcloud/context_chat
path: apps/context_chat
persist-credentials: false
# todo: remove later
ref: feat/reverse-content-flow

- name: Checkout backend
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4
Expand Down Expand Up @@ -167,6 +169,10 @@ jobs:
cd ..
rm -rf documentation

- name: Run files scan
run: |
./occ files:scan --all

- name: Setup python 3.11
uses: actions/setup-python@42375524e23c412d93fb67b49958b491fce71c38 # v5
with:
Expand Down Expand Up @@ -195,28 +201,91 @@ jobs:
timeout 10 ./occ app_api:daemon:register --net host manual_install "Manual Install" manual-install http localhost http://localhost:8080
timeout 120 ./occ app_api:app:register context_chat_backend manual_install --json-info "{\"appid\":\"context_chat_backend\",\"name\":\"Context Chat Backend\",\"daemon_config_name\":\"manual_install\",\"version\":\"${{ fromJson(steps.appinfo.outputs.result).version }}\",\"secret\":\"12345\",\"port\":10034,\"scopes\":[],\"system_app\":0}" --force-scopes --wait-finish
ls -la context_chat_backend/persistent_storage/*
sleep 30 # Wait for the em server to get ready

- name: Scan files, baseline
run: |
./occ files:scan admin
./occ context_chat:scan admin -m text/plain

- name: Check python memory usage
- name: Initial memory usage check
run: |
ps -p $(cat pid.txt) -o pid,cmd,%mem,rss --sort=-%mem
ps -p $(cat pid.txt) -o %mem --no-headers > initial_mem.txt

- name: Scan files
- name: Run cron jobs
run: |
./occ files:scan admin
./occ context_chat:scan admin -m text/markdown &
./occ context_chat:scan admin -m text/x-rst
# every 10 seconds indefinitely
while true; do
php cron.php
sleep 10
done &
sleep 30
# list all the bg jobs
./occ background-job:list

- name: Initial dump of DB with context_chat_queue populated
run: |
docker exec postgres pg_dump nextcloud > /tmp/0_pgdump_nextcloud

- name: Check python memory usage
- name: Periodically check context_chat stats for 15 minutes to allow the backend to index the files
run: |
ps -p $(cat pid.txt) -o pid,cmd,%mem,rss --sort=-%mem
ps -p $(cat pid.txt) -o %mem --no-headers > after_scan_mem.txt
success=0
echo "::group::Checking stats periodically for 15 minutes to allow the backend to index the files"
for i in {1..90}; do
echo "Checking stats, attempt $i..."

stats_err=$(mktemp)
stats=$(timeout 5 ./occ context_chat:stats --json 2>"$stats_err")
stats_exit=$?
echo "Stats output:"
echo "$stats"
if [ -s "$stats_err" ]; then
echo "Stderr:"
cat "$stats_err"
fi
echo "---"
rm -f "$stats_err"

# Check for critical errors in output
if [ $stats_exit -ne 0 ] || echo "$stats" | grep -q "Error during request"; then
echo "Backend connection error detected (exit=$stats_exit), retrying..."
sleep 10
continue
fi

# Extract total eligible files
total_eligible_files=$(echo "$stats" | jq '.eligible_files_count' || echo "")

# Extract indexed documents count (files__default)
indexed_count=$(echo "$stats" | jq '.vectordb_document_counts.files__default' || echo "")

echo "Total eligible files: $total_eligible_files"
echo "Indexed documents (files__default): $indexed_count"

diff=$((total_eligible_files - indexed_count))
threshold=$((total_eligible_files * 3 / 100))

# Check if difference is within tolerance
if [ $diff -le $threshold ]; then
echo "Indexing within 3% tolerance (diff=$diff, threshold=$threshold)"
success=1
break
else
progress=$((diff * 100 / total_eligible_files))
echo "Outside 3% tolerance: diff=$diff (${progress}%), threshold=$threshold"
fi

# Check if backend is still alive
ccb_alive=$(ps -p $(cat pid.txt) -o cmd= | grep -c "main.py" || echo "0")
if [ "$ccb_alive" -eq 0 ]; then
echo "Error: Context Chat Backend process is not running. Exiting."
exit 1
fi

sleep 10
done

echo "::endgroup::"

if [ $success -ne 1 ]; then
echo "Max attempts reached"
exit 1
fi

- name: Run the prompts
run: |
Expand Down Expand Up @@ -250,18 +319,9 @@ jobs:
echo "Memory usage during scan is stable. No memory leak detected."
fi

- name: Compare memory usage and detect leak
- name: Final dump of DB with vectordb populated
run: |
initial_mem=$(cat after_scan_mem.txt | tr -d ' ')
final_mem=$(cat after_prompt_mem.txt | tr -d ' ')
echo "Initial Memory Usage: $initial_mem%"
echo "Memory Usage after prompt: $final_mem%"

if (( $(echo "$final_mem > $initial_mem" | bc -l) )); then
echo "Memory usage has increased during prompt. Possible memory leak detected!"
else
echo "Memory usage during prompt is stable. No memory leak detected."
fi
docker exec postgres pg_dump nextcloud > /tmp/1_pgdump_nextcloud

- name: Show server logs
if: always()
Expand Down Expand Up @@ -298,6 +358,19 @@ jobs:
run: |
tail -v -n +1 context_chat_backend/persistent_storage/logs/em_server.log* || echo "No logs in logs directory"

- name: Upload database dumps
uses: actions/upload-artifact@v4
with:
name: database-dumps-${{ matrix.server-versions }}-php@${{ matrix.php-versions }}
path: |
/tmp/0_pgdump_nextcloud
/tmp/1_pgdump_nextcloud

- name: Final stats log
run: |
./occ context_chat:stats
./occ context_chat:stats --json

summary:
permissions:
contents: none
Expand Down
14 changes: 14 additions & 0 deletions appinfo/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -82,5 +82,19 @@ Setup background job workers as described here: https://docs.nextcloud.com/serve
<description>Password to be used for authenticating requests to the OpenAI-compatible endpoint set in CC_EM_BASE_URL.</description>
</variable>
</environment-variables>
<k8s-service-roles>
<role>
<name>rp</name>
<display-name>Request Processing Mode</display-name>
<env>APP_ROLE=rp</env>
<expose>true</expose>
</role>
<role>
<name>indexing</name>
<display-name>Indexing Mode</display-name>
<env>APP_ROLE=indexing</env>
<expose>false</expose>
</role>
</k8s-service-roles>
</external-app>
</info>
83 changes: 44 additions & 39 deletions context_chat_backend/chain/ingest/doc_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,24 @@
# SPDX-License-Identifier: AGPL-3.0-or-later
#

import logging
import re
import tempfile
from collections.abc import Callable
from typing import BinaryIO
from io import BytesIO

import docx2txt
from epub2txt import epub2txt
from fastapi import UploadFile
from langchain_unstructured import UnstructuredLoader
from odfdo import Document
from pandas import read_csv, read_excel
from pypdf import PdfReader
from pypdf.errors import FileNotDecryptedError as PdfFileNotDecryptedError
from striprtf import striprtf

logger = logging.getLogger('ccb.doc_loader')
from ...types import IndexingException, SourceItem

def _temp_file_wrapper(file: BinaryIO, loader: Callable, sep: str = '\n') -> str:

def _temp_file_wrapper(file: BytesIO, loader: Callable, sep: str = '\n') -> str:
raw_bytes = file.read()
with tempfile.NamedTemporaryFile(mode='wb') as tmp:
tmp.write(raw_bytes)
Expand All @@ -35,49 +34,49 @@ def _temp_file_wrapper(file: BinaryIO, loader: Callable, sep: str = '\n') -> str

# -- LOADERS -- #

def _load_pdf(file: BinaryIO) -> str:
def _load_pdf(file: BytesIO) -> str:
pdf_reader = PdfReader(file)
return '\n\n'.join([page.extract_text().strip() for page in pdf_reader.pages])


def _load_csv(file: BinaryIO) -> str:
def _load_csv(file: BytesIO) -> str:
return read_csv(file).to_string(header=False, na_rep='')


def _load_epub(file: BinaryIO) -> str:
def _load_epub(file: BytesIO) -> str:
return _temp_file_wrapper(file, epub2txt).strip()


def _load_docx(file: BinaryIO) -> str:
def _load_docx(file: BytesIO) -> str:
return docx2txt.process(file).strip()


def _load_odt(file: BinaryIO) -> str:
def _load_odt(file: BytesIO) -> str:
return _temp_file_wrapper(file, lambda fp: Document(fp).get_formatted_text()).strip()


def _load_ppt_x(file: BinaryIO) -> str:
def _load_ppt_x(file: BytesIO) -> str:
return _temp_file_wrapper(file, lambda fp: UnstructuredLoader(fp).load()).strip()


def _load_rtf(file: BinaryIO) -> str:
def _load_rtf(file: BytesIO) -> str:
return striprtf.rtf_to_text(file.read().decode('utf-8', 'ignore')).strip()


def _load_xml(file: BinaryIO) -> str:
def _load_xml(file: BytesIO) -> str:
data = file.read().decode('utf-8', 'ignore')
data = re.sub(r'</.+>', '', data)
return data.strip()


def _load_xlsx(file: BinaryIO) -> str:
def _load_xlsx(file: BytesIO) -> str:
return read_excel(file, na_filter=False).to_string(header=False, na_rep='')


def _load_email(file: BinaryIO, ext: str = 'eml') -> str | None:
def _load_email(file: BytesIO, ext: str = 'eml') -> str:
# NOTE: msg format is not tested
if ext not in ['eml', 'msg']:
return None
raise IndexingException(f'Unsupported email format: {ext}')

# TODO: implement attachment partitioner using unstructured.partition.partition_{email,msg}
# since langchain does not pass through the attachment_partitioner kwarg
Expand Down Expand Up @@ -115,30 +114,36 @@ def attachment_partitioner(
}


def decode_source(source: UploadFile) -> str | None:
def decode_source(source: SourceItem) -> str:
'''
Raises
------
IndexingException
'''

io_obj: BytesIO | None = None
try:
# .pot files are powerpoint templates but also plain text files,
# so we skip them to prevent decoding errors
if source.headers['title'].endswith('.pot'):
return None

mimetype = source.headers['type']
if mimetype is None:
return None

if _loader_map.get(mimetype):
result = _loader_map[mimetype](source.file)
source.file.close()
return result.encode('utf-8', 'ignore').decode('utf-8', 'ignore')

result = source.file.read().decode('utf-8', 'ignore')
source.file.close()
return result
except PdfFileNotDecryptedError:
logger.warning(f'PDF file ({source.filename}) is encrypted and cannot be read')
return None
except Exception:
logger.exception(f'Error decoding source file ({source.filename})', stack_info=True)
return None
if source.title.endswith('.pot'):
raise IndexingException('PowerPoint template files (.pot) are not supported')

if isinstance(source.content, str):
io_obj = BytesIO(source.content.encode('utf-8', 'ignore'))
else:
io_obj = source.content

if _loader_map.get(source.type):
result = _loader_map[source.type](io_obj)
return result.encode('utf-8', 'ignore').decode('utf-8', 'ignore').strip()

return io_obj.read().decode('utf-8', 'ignore').strip()
except IndexingException:
raise
except PdfFileNotDecryptedError as e:
raise IndexingException('PDF file is encrypted and cannot be read') from e
except Exception as e:
raise IndexingException(f'Error decoding source file: {e}') from e
finally:
source.file.close() # Ensure file is closed after processing
if io_obj is not None:
io_obj.close()
Loading
Loading