@@ -3,20 +3,30 @@ use std::sync::Arc;
33use opencode_mem_core:: {
44 GlobalKnowledge , KnowledgeInput , KnowledgeSearchResult , KnowledgeType , cap_query_limit,
55} ;
6- use opencode_mem_storage:: traits:: KnowledgeStore ;
6+ use opencode_mem_embeddings:: LazyEmbeddingService ;
7+ use opencode_mem_storage:: traits:: { EmbeddingStore , KnowledgeStore } ;
78use opencode_mem_storage:: { StorageBackend , StorageError } ;
89
910use crate :: ServiceError ;
1011
12+ const PROVENANCE_SIMILARITY_THRESHOLD : f32 = 0.75 ;
13+
1114#[ derive( Clone ) ]
1215pub struct KnowledgeService {
1316 storage : Arc < StorageBackend > ,
17+ embeddings : Option < Arc < LazyEmbeddingService > > ,
1418}
1519
1620impl KnowledgeService {
1721 #[ must_use]
18- pub fn new ( storage : Arc < StorageBackend > ) -> Self {
19- Self { storage }
22+ pub fn new (
23+ storage : Arc < StorageBackend > ,
24+ embeddings : Option < Arc < LazyEmbeddingService > > ,
25+ ) -> Self {
26+ Self {
27+ storage,
28+ embeddings,
29+ }
2030 }
2131
2232 pub fn circuit_breaker ( & self ) -> & opencode_mem_storage:: CircuitBreaker {
@@ -52,11 +62,29 @@ impl KnowledgeService {
5262 id : & str ,
5363 input : KnowledgeInput ,
5464 ) -> Result < GlobalKnowledge , ServiceError > {
55- let result = self
56- . storage
57- . guarded ( || self . storage . save_knowledge_with_id ( id, input. clone ( ) ) )
58- . await ;
59- self . with_cb ( result)
65+ let needs_provenance = input. source_observation . is_none ( ) && self . embeddings . is_some ( ) ;
66+
67+ let embedding = self . generate_knowledge_embedding ( & input) . await ;
68+
69+ let result = if let Some ( emb) = embedding {
70+ self . storage
71+ . guarded ( || {
72+ self . storage
73+ . save_knowledge_with_embedding ( id, input. clone ( ) , emb. clone ( ) )
74+ } )
75+ . await
76+ } else {
77+ self . storage
78+ . guarded ( || self . storage . save_knowledge_with_id ( id, input. clone ( ) ) )
79+ . await
80+ } ;
81+ let knowledge = self . with_cb ( result) ?;
82+
83+ if needs_provenance {
84+ self . spawn_provenance_linking ( knowledge. clone ( ) ) ;
85+ }
86+
87+ Ok ( knowledge)
6088 }
6189
6290 pub async fn delete_knowledge ( & self , id : & str ) -> Result < bool , ServiceError > {
@@ -102,9 +130,37 @@ impl KnowledgeService {
102130 self . update_knowledge_usage_batch ( & [ id. to_owned ( ) ] ) . await
103131 }
104132
105- /// Fire-and-forget usage_count increment. Failures are logged at warn level
106- /// and never propagate to the caller — reading knowledge must never fail
107- /// because of a usage tracking issue.
133+ async fn generate_knowledge_embedding ( & self , input : & KnowledgeInput ) -> Option < Vec < f32 > > {
134+ let embeddings = self . embeddings . as_ref ( ) ?;
135+ let text = format ! ( "{} {}" , input. title. trim( ) , input. description) ;
136+ let embeddings_clone = Arc :: clone ( embeddings) ;
137+ let embed_result = tokio:: task:: spawn_blocking ( move || {
138+ use opencode_mem_embeddings:: EmbeddingProvider ;
139+ embeddings_clone. embed ( & text)
140+ } )
141+ . await ;
142+
143+ match embed_result {
144+ Ok ( Ok ( vec) ) => Some ( vec) ,
145+ Ok ( Err ( e) ) => {
146+ tracing:: warn!(
147+ title = %input. title,
148+ error = %e,
149+ "Knowledge embedding generation failed, falling back to non-semantic dedup"
150+ ) ;
151+ None
152+ }
153+ Err ( e) => {
154+ tracing:: warn!(
155+ title = %input. title,
156+ error = %e,
157+ "Knowledge embedding spawn_blocking panicked"
158+ ) ;
159+ None
160+ }
161+ }
162+ }
163+
108164 fn spawn_usage_increment ( & self , ids : Vec < String > ) {
109165 let storage = Arc :: clone ( & self . storage ) ;
110166 tokio:: spawn ( async move {
@@ -118,6 +174,87 @@ impl KnowledgeService {
118174 } ) ;
119175 }
120176
177+ fn spawn_provenance_linking ( & self , knowledge : GlobalKnowledge ) {
178+ let Some ( ref embeddings) = self . embeddings else {
179+ return ;
180+ } ;
181+ if !knowledge. source_observations . is_empty ( ) {
182+ return ;
183+ }
184+
185+ let embeddings = Arc :: clone ( embeddings) ;
186+ let storage = Arc :: clone ( & self . storage ) ;
187+ let knowledge_id = knowledge. id . clone ( ) ;
188+
189+ tokio:: spawn ( async move {
190+ let text = format ! ( "{} {}" , knowledge. title, knowledge. description) ;
191+ let embeddings_clone = Arc :: clone ( & embeddings) ;
192+ let embed_result = tokio:: task:: spawn_blocking ( move || {
193+ use opencode_mem_embeddings:: EmbeddingProvider ;
194+ embeddings_clone. embed ( & text)
195+ } )
196+ . await ;
197+
198+ let embedding = match embed_result {
199+ Ok ( Ok ( vec) ) => vec,
200+ Ok ( Err ( e) ) => {
201+ tracing:: warn!(
202+ knowledge_id = %knowledge_id,
203+ error = %e,
204+ "Provenance linking: embedding generation failed"
205+ ) ;
206+ return ;
207+ }
208+ Err ( e) => {
209+ tracing:: warn!(
210+ knowledge_id = %knowledge_id,
211+ error = %e,
212+ "Provenance linking: spawn_blocking panicked"
213+ ) ;
214+ return ;
215+ }
216+ } ;
217+
218+ let similar = match storage
219+ . find_similar ( & embedding, PROVENANCE_SIMILARITY_THRESHOLD , None )
220+ . await
221+ {
222+ Ok ( Some ( m) ) => m,
223+ Ok ( None ) => return ,
224+ Err ( e) => {
225+ tracing:: warn!(
226+ knowledge_id = %knowledge_id,
227+ error = %e,
228+ "Provenance linking: find_similar failed"
229+ ) ;
230+ return ;
231+ }
232+ } ;
233+
234+ match storage
235+ . link_source_observation ( & knowledge_id, & similar. observation_id )
236+ . await
237+ {
238+ Ok ( true ) => {
239+ tracing:: info!(
240+ knowledge_id = %knowledge_id,
241+ observation_id = %similar. observation_id,
242+ similarity = %similar. similarity,
243+ "Auto-linked provenance via embedding similarity"
244+ ) ;
245+ }
246+ Ok ( false ) => { }
247+ Err ( e) => {
248+ tracing:: warn!(
249+ knowledge_id = %knowledge_id,
250+ error = %e,
251+ "Provenance linking: link_source_observation failed"
252+ ) ;
253+ }
254+ }
255+ } ) ;
256+ }
257+
121258 pub async fn update_knowledge_usage_batch ( & self , ids : & [ String ] ) -> Result < ( ) , ServiceError > {
122259 let result = self
123260 . storage
@@ -144,7 +281,7 @@ impl KnowledgeService {
144281
145282 pub async fn run_confidence_lifecycle ( & self ) -> Result < ( u64 , u64 ) , ServiceError > {
146283 let decayed = self . decay_confidence ( ) . await ?;
147- let archived = self . auto_archive ( 30 ) . await ?;
284+ let archived = self . auto_archive ( 90 ) . await ?;
148285 Ok ( ( decayed, archived) )
149286 }
150287}
0 commit comments