TahaRasouli commited on
Commit
58857c9
·
verified ·
1 Parent(s): c70ad2a

Update unified_document_processor.py

Browse files
Files changed (1) hide show
  1. unified_document_processor.py +356 -617
unified_document_processor.py CHANGED
@@ -1,4 +1,4 @@
1
- from typing import List, Dict, Union
2
  from groq import Groq
3
  import chromadb
4
  import os
@@ -18,6 +18,138 @@ class CustomEmbeddingFunction:
18
  embeddings = self.model.encode(input)
19
  return embeddings.tolist()
20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  class UnifiedDocumentProcessor:
22
  def __init__(self, groq_api_key, collection_name="unified_content"):
23
  """Initialize the processor with necessary clients"""
@@ -25,6 +157,7 @@ class UnifiedDocumentProcessor:
25
 
26
  # XML-specific settings
27
  self.max_elements_per_chunk = 50
 
28
 
29
  # PDF-specific settings
30
  self.pdf_chunk_size = 500
@@ -52,32 +185,37 @@ class UnifiedDocumentProcessor:
52
  )
53
 
54
  def _initialize_nltk(self):
55
- """Ensure both NLTK resources are available."""
56
  try:
 
 
 
57
  nltk.download('punkt')
58
- try:
59
- nltk.data.find('tokenizers/punkt_tab')
60
- except LookupError:
61
- nltk.download('punkt_tab')
 
62
  except Exception as e:
63
- print(f"Warning: Error downloading NLTK resources: {str(e)}")
64
- print("Falling back to basic sentence splitting...")
65
-
66
- def _basic_sentence_split(self, text: str) -> List[str]:
67
- """Fallback method for sentence tokenization"""
68
- sentences = []
69
- current = ""
70
 
71
- for char in text:
72
- current += char
73
- if char in ['.', '!', '?'] and len(current.strip()) > 0:
74
- sentences.append(current.strip())
75
- current = ""
76
-
77
- if current.strip():
78
- sentences.append(current.strip())
 
 
79
 
80
- return sentences
81
 
82
  def extract_text_from_pdf(self, pdf_path: str) -> str:
83
  """Extract text from PDF file"""
@@ -93,12 +231,7 @@ class UnifiedDocumentProcessor:
93
 
94
  def chunk_text(self, text: str) -> List[str]:
95
  """Split text into chunks while preserving sentence boundaries"""
96
- try:
97
- sentences = sent_tokenize(text)
98
- except Exception as e:
99
- print(f"Warning: Using fallback sentence splitting: {str(e)}")
100
- sentences = self._basic_sentence_split(text)
101
-
102
  chunks = []
103
  current_chunk = []
104
  current_size = 0
@@ -125,646 +258,252 @@ class UnifiedDocumentProcessor:
125
 
126
  return chunks
127
 
128
- def process_xml_file(self, xml_file_path: str) -> Dict:
129
- """Process XML file with optimized batching and reduced database operations"""
130
- try:
131
- tree = ET.parse(xml_file_path)
132
- root = tree.getroot()
133
-
134
- # Process XML into chunks efficiently
135
- chunks = []
136
- paths = []
137
-
138
- def process_element(element, current_path=""):
139
- # Create element description
140
- element_info = []
141
-
142
- # Add basic information
143
- element_info.append(f"Element: {element.tag}")
144
-
145
- # Process namespace only if present
146
- if '}' in element.tag:
147
- namespace = element.tag.split('}')[0].strip('{')
148
- element_info.append(f"Namespace: {namespace}")
149
-
150
- # Process important attributes only
151
- important_attrs = ['NodeId', 'BrowseName', 'DisplayName', 'Description', 'DataType']
152
- attrs = {k: v for k, v in element.attrib.items() if k in important_attrs}
153
- if attrs:
154
- for key, value in attrs.items():
155
- element_info.append(f"{key}: {value}")
156
-
157
- # Process text content if meaningful
158
- if element.text and element.text.strip():
159
- element_info.append(f"Content: {element.text.strip()}")
160
-
161
- # Create chunk text
162
- chunk_text = " | ".join(element_info)
163
- new_path = f"{current_path}/{element.tag}" if current_path else element.tag
164
-
165
- chunks.append(chunk_text)
166
- paths.append(new_path)
167
-
168
- # Process children
169
- for child in element:
170
- process_element(child, new_path)
171
-
172
- # Start processing from root
173
- process_element(root)
174
- print(f"Generated {len(chunks)} XML chunks")
175
-
176
- # Batch process into database
177
- batch_size = 100 # Increased batch size
178
- results = []
179
-
180
- for i in range(0, len(chunks), batch_size):
181
- batch_end = min(i + batch_size, len(chunks))
182
- batch_chunks = chunks[i:batch_end]
183
- batch_paths = paths[i:batch_end]
184
-
185
- # Prepare batch metadata
186
- batch_metadata = [{
187
- 'source_file': os.path.basename(xml_file_path),
188
- 'content_type': 'xml',
189
- 'chunk_id': idx,
190
- 'total_chunks': len(chunks),
191
- 'xml_path': path,
192
- 'timestamp': str(datetime.datetime.now())
193
- } for idx, path in enumerate(batch_paths, start=i)]
194
-
195
- # Generate batch IDs
196
- batch_ids = [
197
- f"{os.path.basename(xml_file_path)}_xml_{idx}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
198
- for idx in range(i, batch_end)
199
- ]
200
-
201
- # Store batch in vector database
202
- self.collection.add(
203
- documents=batch_chunks,
204
- metadatas=batch_metadata,
205
- ids=batch_ids
206
- )
207
-
208
- # Track results
209
- results.extend([{
210
- 'chunk': idx,
211
- 'success': True,
212
- 'doc_id': doc_id,
213
- 'text': text
214
- } for idx, (doc_id, text) in enumerate(zip(batch_ids, batch_chunks), start=i)])
215
-
216
- # Print progress
217
- print(f"Processed chunks {i} to {batch_end} of {len(chunks)}")
218
 
219
- return {
220
- 'success': True,
221
- 'total_chunks': len(chunks),
222
- 'results': results
223
- }
224
-
225
- except Exception as e:
226
- print(f"Error processing XML: {str(e)}")
227
- return {
228
- 'success': False,
229
- 'error': str(e)
230
- }
231
 
232
- def process_pdf_file(self, pdf_file_path: str) -> Dict:
233
- """Process PDF file with direct embedding"""
234
- try:
235
- full_text = self.extract_text_from_pdf(pdf_file_path)
236
- chunks = self.chunk_text(full_text)
237
 
238
- print(f"Split PDF into {len(chunks)} chunks")
239
- results = []
240
 
241
- for i, chunk in enumerate(chunks):
242
- try:
243
- metadata = {
244
- 'source_file': os.path.basename(pdf_file_path),
245
- 'content_type': 'pdf',
246
- 'chunk_id': i,
247
- 'total_chunks': len(chunks),
248
- 'timestamp': str(datetime.datetime.now()),
249
- 'chunk_size': len(chunk.split())
250
- }
251
-
252
- # Store directly in vector database
253
- doc_id = self.store_in_vector_db(chunk, metadata)
254
-
255
- results.append({
256
- 'chunk': i,
257
- 'success': True,
258
- 'doc_id': doc_id,
259
- 'text': chunk[:200] + "..." if len(chunk) > 200 else chunk
260
- })
261
- except Exception as e:
262
- results.append({
263
- 'chunk': i,
264
- 'success': False,
265
- 'error': str(e)
266
- })
267
 
268
- return {
269
- 'success': True,
270
- 'total_chunks': len(chunks),
271
- 'results': results
272
- }
273
 
 
 
 
 
 
 
274
  except Exception as e:
275
- return {
276
- 'success': False,
277
- 'error': str(e)
278
- }
 
 
 
 
 
279
 
280
- def store_in_vector_db(self, text: str, metadata: Dict) -> str:
281
  """Store content in vector database"""
282
  doc_id = f"{metadata['source_file']}_{metadata['content_type']}_{metadata['chunk_id']}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
283
 
284
  self.collection.add(
285
- documents=[text],
286
  metadatas=[metadata],
287
  ids=[doc_id]
288
  )
289
 
290
  return doc_id
291
 
292
- def get_available_files(self) -> Dict[str, List[str]]:
293
- """Get list of all files in the database"""
294
  try:
295
- all_entries = self.collection.get(
296
- include=['metadatas']
297
- )
298
-
299
- files = {
300
- 'pdf': set(),
301
- 'xml': set()
302
- }
303
-
304
- for metadata in all_entries['metadatas']:
305
- file_type = metadata['content_type']
306
- file_name = metadata['source_file']
307
- files[file_type].add(file_name)
308
 
 
 
 
 
 
 
 
 
 
 
309
  return {
310
- 'pdf': sorted(list(files['pdf'])),
311
- 'xml': sorted(list(files['xml']))
312
  }
313
- except Exception as e:
314
- print(f"Error getting available files: {str(e)}")
315
- return {'pdf': [], 'xml': []}
316
 
317
- def ask_question_selective(self, question: str, selected_files: List[str], n_results: int = 5) -> str:
318
- """Ask a question using only the selected files"""
319
  try:
320
- filter_dict = {
321
- 'source_file': {'$in': selected_files}
322
- }
323
-
324
- results = self.collection.query(
325
- query_texts=[question],
326
- n_results=n_results,
327
- where=filter_dict,
328
- include=["documents", "metadatas"]
329
- )
330
-
331
- if not results['documents'][0]:
332
- return "No relevant content found in the selected files."
333
-
334
- # Format answer based on content type
335
- formatted_answer = []
336
- for doc, meta in zip(results['documents'][0], results['metadatas'][0]):
337
- if meta['content_type'] == 'xml':
338
- formatted_answer.append(f"Found in XML path: {meta['xml_path']}\n{doc}")
339
- else:
340
- formatted_answer.append(doc)
341
-
342
- # Create response using the matched content
343
- prompt = f"""Based on these relevant sections, please answer: {question}
344
-
345
- Relevant Content:
346
- {' '.join(formatted_answer)}
347
-
348
- Please provide a clear, concise answer based on the above content."""
349
-
350
- response = self.groq_client.chat.completions.create(
351
- messages=[{"role": "user", "content": prompt}],
352
- model="llama3-8b-8192",
353
- temperature=0.2
354
- )
355
 
356
- return response.choices[0].message.content
 
357
 
358
- except Exception as e:
359
- return f"Error processing your question: {str(e)}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
360
 
361
- def get_detailed_context(self, question: str, selected_files: List[str], n_results: int = 5) -> Dict:
362
- """Get detailed context including path and metadata information"""
363
- try:
364
- filter_dict = {
365
- 'source_file': {'$in': selected_files}
366
- }
367
-
368
- results = self.collection.query(
369
- query_texts=[question],
370
- n_results=n_results,
371
- where=filter_dict,
372
- include=["documents", "metadatas", "distances"]
373
- )
374
-
375
- if not results['documents'][0]:
376
- return {
377
- 'success': False,
378
- 'error': "No relevant content found"
379
- }
380
-
381
- detailed_results = []
382
- for doc, meta, distance in zip(results['documents'][0], results['metadatas'][0], results['distances'][0]):
383
- result_info = {
384
- 'content': doc,
385
- 'metadata': meta,
386
- 'similarity_score': round((1 - distance) * 100, 2), # Convert to percentage
387
- 'source_info': {
388
- 'file': meta['source_file'],
389
- 'type': meta['content_type'],
390
- 'path': meta.get('xml_path', 'N/A'),
391
- 'context': json.loads(meta['context']) if meta.get('context') else {}
392
- }
393
- }
394
- detailed_results.append(result_info)
395
-
396
  return {
397
  'success': True,
398
- 'results': detailed_results,
399
- 'query': question
400
  }
401
-
402
  except Exception as e:
403
  return {
404
  'success': False,
405
  'error': str(e)
406
  }
407
 
408
- def get_hierarchical_context(self, question: str, selected_files: List[str], n_results: int = 5) -> Dict:
409
- """Get hierarchical context for XML files including parent-child relationships"""
410
- try:
411
- # Get initial results
412
- initial_results = self.get_detailed_context(question, selected_files, n_results)
413
-
414
- if not initial_results['success']:
415
- return initial_results
416
 
417
- hierarchical_results = []
418
- for result in initial_results['results']:
419
- if result['metadata']['content_type'] == 'xml':
420
- # Get parent elements
421
- parent_path = '/'.join(result['source_info']['path'].split('/')[:-1])
422
- if parent_path:
423
- parent_filter = {
424
- 'source_file': {'$eq': result['metadata']['source_file']},
425
- 'xml_path': {'$eq': parent_path}
426
- }
427
- parent_results = self.collection.query(
428
- query_texts=[""], # Empty query to get exact match
429
- where=parent_filter,
430
- include=["documents", "metadatas"],
431
- n_results=1
432
- )
433
- if parent_results['documents'][0]:
434
- result['parent_info'] = {
435
- 'content': parent_results['documents'][0][0],
436
- 'metadata': parent_results['metadatas'][0][0]
437
- }
438
 
439
- # Get all potential children
440
- all_filter = {
441
- 'source_file': {'$eq': result['metadata']['source_file']}
442
- }
443
- all_results = self.collection.query(
444
- query_texts=[""],
445
- where=all_filter,
446
- include=["documents", "metadatas"],
447
- n_results=100
448
- )
449
 
450
- # Manually filter children
451
- children_info = []
452
- current_path = result['source_info']['path']
453
- if all_results['documents'][0]:
454
- for doc, meta in zip(all_results['documents'][0], all_results['metadatas'][0]):
455
- child_path = meta.get('xml_path', '')
456
- if (child_path.startswith(current_path + '/') and
457
- len(child_path.split('/')) == len(current_path.split('/')) + 1):
458
- children_info.append({
459
- 'content': doc,
460
- 'metadata': meta
461
- })
 
 
 
 
 
 
 
 
 
 
 
462
 
463
- if children_info:
464
- result['children_info'] = children_info[:5]
 
 
 
465
 
466
- hierarchical_results.append(result)
 
 
 
 
467
 
468
- return {
469
- 'success': True,
470
- 'results': hierarchical_results,
471
- 'query': question
472
- }
 
473
 
474
- except Exception as e:
475
- return {
476
- 'success': False,
477
- 'error': str(e)
478
- }
479
 
480
- def get_summary_and_details(self, question: str, selected_files: List[str]) -> Dict:
481
- """Get both a summary answer and detailed supporting information"""
482
- try:
483
- # Get hierarchical context first
484
- detailed_results = self.get_hierarchical_context(question, selected_files)
485
-
486
- if not detailed_results['success']:
487
- return detailed_results
488
 
489
- # Create summary prompt
490
- relevant_content = []
491
- for result in detailed_results['results']:
492
- if result['metadata']['content_type'] == 'xml':
493
- content_info = [
494
- f"XML Path: {result['source_info']['path']}",
495
- f"Content: {result['content']}"
496
- ]
497
- if 'parent_info' in result:
498
- content_info.append(f"Parent: {result['parent_info']['content']}")
499
- if 'children_info' in result:
500
- children_content = [child['content'] for child in result['children_info']]
501
- content_info.append(f"Related Elements: {', '.join(children_content)}")
502
- else:
503
- content_info = [f"Content: {result['content']}"]
504
 
505
- relevant_content.append('\n'.join(content_info))
 
 
 
 
 
506
 
507
- summary_prompt = (
508
- f"Based on the following content, please provide:\n"
509
- "1. A concise answer to the question\n"
510
- "2. Key supporting points\n"
511
- "3. Related context if relevant\n\n"
512
- f"Question: {question}\n\n"
513
- f"Content:\n{chr(10).join(relevant_content)}"
514
- )
515
 
516
- response = self.groq_client.chat.completions.create(
517
- messages=[{"role": "user", "content": summary_prompt}],
518
- model="llama3-8b-8192",
519
- temperature=0.2
520
- )
521
 
522
- return {
523
- 'success': True,
524
- 'summary': response.choices[0].message.content,
525
- 'details': detailed_results['results'],
526
- 'query': question
527
- }
528
 
529
- except Exception as e:
530
- return {
531
- 'success': False,
532
- 'error': str(e)
533
- }
534
-
535
-
536
- def process_file(self, file_path: str) -> Dict:
537
- """Process any supported file type"""
538
- try:
539
- file_extension = os.path.splitext(file_path)[1].lower()
540
-
541
- if file_extension == '.xml':
542
- return self.process_xml_file(file_path)
543
- elif file_extension == '.pdf':
544
- return self.process_pdf_file(file_path)
545
- else:
546
- return {
547
- 'success': False,
548
- 'error': f'Unsupported file type: {file_extension}'
549
- }
550
- except Exception as e:
551
- return {
552
- 'success': False,
553
- 'error': f'Error processing file: {str(e)}'
554
- }
555
-
556
- def calculate_detailed_score(self, distance: float, metadata: Dict, content: str, query: str) -> Dict:
557
- """
558
- Calculate a detailed, multi-faceted relevance score
559
-
560
- Components:
561
- 1. Vector Similarity (40%): Base similarity from embeddings
562
- 2. Content Match (20%): Direct term matching
563
- 3. Structural Relevance (20%): XML structure relevance (for XML files)
564
- 4. Context Completeness (10%): Completeness of metadata/context
565
- 5. Freshness (10%): How recent the content is
566
- """
567
- try:
568
- scores = {}
569
-
570
- # 1. Vector Similarity Score (40%)
571
- vector_similarity = 1 - distance # Convert distance to similarity
572
- scores['vector_similarity'] = {
573
- 'score': vector_similarity,
574
- 'weight': 0.4,
575
- 'weighted_score': vector_similarity * 0.4
576
- }
577
-
578
- # 2. Content Match Score (20%)
579
- content_match_score = self._calculate_content_match(content, query)
580
- scores['content_match'] = {
581
- 'score': content_match_score,
582
- 'weight': 0.2,
583
- 'weighted_score': content_match_score * 0.2
584
- }
585
-
586
- # 3. Structural Relevance Score (20%)
587
- if metadata['content_type'] == 'xml':
588
- structural_score = self._calculate_structural_relevance(metadata)
589
- else:
590
- structural_score = 0.5 # Default for non-XML
591
- scores['structural_relevance'] = {
592
- 'score': structural_score,
593
- 'weight': 0.2,
594
- 'weighted_score': structural_score * 0.2
595
- }
596
-
597
- # 4. Context Completeness Score (10%)
598
- context_score = self._calculate_context_completeness(metadata)
599
- scores['context_completeness'] = {
600
- 'score': context_score,
601
- 'weight': 0.1,
602
- 'weighted_score': context_score * 0.1
603
- }
604
-
605
- # 5. Freshness Score (10%)
606
- freshness_score = self._calculate_freshness(metadata['timestamp'])
607
- scores['freshness'] = {
608
- 'score': freshness_score,
609
- 'weight': 0.1,
610
- 'weighted_score': freshness_score * 0.1
611
- }
612
-
613
- # Calculate total score
614
- total_score = sum(s['weighted_score'] for s in scores.values())
615
-
616
- return {
617
- 'total_score': total_score,
618
- 'component_scores': scores,
619
- 'explanation': self._generate_score_explanation(scores)
620
- }
621
-
622
- except Exception as e:
623
- print(f"Error in score calculation: {str(e)}")
624
- return {
625
- 'total_score': 0.5,
626
- 'error': str(e)
627
- }
628
-
629
- def _calculate_content_match(self, content: str, query: str) -> float:
630
- """Calculate direct term matching score"""
631
- try:
632
- # Tokenize content and query
633
- content_terms = set(content.lower().split())
634
- query_terms = set(query.lower().split())
635
-
636
- # Calculate overlap
637
- matching_terms = content_terms.intersection(query_terms)
638
- if not query_terms:
639
- return 0.5
640
-
641
- # Calculate scores for exact matches and partial matches
642
- exact_match_score = len(matching_terms) / len(query_terms)
643
-
644
- # Check for partial matches
645
- partial_matches = 0
646
- for q_term in query_terms:
647
- for c_term in content_terms:
648
- if q_term in c_term or c_term in q_term:
649
- partial_matches += 0.5
650
-
651
- partial_match_score = partial_matches / len(query_terms)
652
-
653
- # Combine scores (70% exact matches, 30% partial matches)
654
- return (exact_match_score * 0.7) + (partial_match_score * 0.3)
655
-
656
- except Exception as e:
657
- print(f"Error in content match calculation: {str(e)}")
658
- return 0.5
659
 
660
- def _calculate_structural_relevance(self, metadata: Dict) -> float:
661
- """Calculate structural relevance score for XML content"""
662
- try:
663
- score = 0.5 # Base score
664
-
665
- if 'xml_path' in metadata:
666
- path = metadata['xml_path']
667
-
668
- # Score based on path depth (deeper paths might be more specific)
669
- depth = len(path.split('/'))
670
- depth_score = min(depth / 5, 1.0) # Normalize depth score
671
-
672
- # Score based on element type
673
- element_type = metadata.get('element_type', '')
674
- type_scores = {
675
- 'UAObjectType': 0.9,
676
- 'UAVariableType': 0.9,
677
- 'UAObject': 0.8,
678
- 'UAVariable': 0.8,
679
- 'UAMethod': 0.7,
680
- 'UAView': 0.6,
681
- 'UAReferenceType': 0.7
682
- }
683
- type_score = type_scores.get(element_type, 0.5)
684
-
685
- # Score based on context completeness
686
- context = json.loads(metadata.get('context', '{}'))
687
- context_score = len(context) / 10 if context else 0.5
688
-
689
- # Combine scores
690
- score = (depth_score * 0.3) + (type_score * 0.4) + (context_score * 0.3)
691
-
692
- return score
693
-
694
- except Exception as e:
695
- print(f"Error in structural relevance calculation: {str(e)}")
696
- return 0.5
697
 
698
- def _calculate_context_completeness(self, metadata: Dict) -> float:
699
- """Calculate context completeness score"""
700
- try:
701
- expected_fields = {
702
- 'xml': ['xml_path', 'element_type', 'context', 'chunk_id', 'total_chunks'],
703
- 'pdf': ['chunk_id', 'total_chunks', 'chunk_size']
704
- }
705
-
706
- content_type = metadata.get('content_type', '')
707
- if content_type not in expected_fields:
708
- return 0.5
709
-
710
- # Check for presence of expected fields
711
- expected = expected_fields[content_type]
712
- present_fields = sum(1 for field in expected if field in metadata)
713
-
714
- # Calculate base completeness score
715
- completeness = present_fields / len(expected)
716
-
717
- # Add bonus for additional useful metadata
718
- bonus = 0
719
- if content_type == 'xml':
720
- context = json.loads(metadata.get('context', '{}'))
721
- if context:
722
- bonus += 0.2
723
-
724
- return min(completeness + bonus, 1.0)
725
-
726
- except Exception as e:
727
- print(f"Error in context completeness calculation: {str(e)}")
728
- return 0.5
729
 
730
- def _calculate_freshness(self, timestamp: str) -> float:
731
- """Calculate freshness score based on timestamp"""
732
- try:
733
- # Parse timestamp
734
- doc_time = datetime.datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S.%f')
735
- now = datetime.datetime.now()
736
-
737
- # Calculate age in hours
738
- age_hours = (now - doc_time).total_seconds() / 3600
739
-
740
- # Score decreases with age (24 hours = 1 day)
741
- if age_hours < 24:
742
- return 1.0
743
- elif age_hours < 168: # 1 week
744
- return 0.8
745
- elif age_hours < 720: # 1 month
746
- return 0.6
747
- else:
748
- return 0.4
749
-
750
- except Exception as e:
751
- print(f"Error in freshness calculation: {str(e)}")
752
- return 0.5
753
 
754
- def _generate_score_explanation(self, scores: Dict) -> str:
755
- """Generate human-readable explanation of scores"""
756
- try:
757
- explanations = [
758
- f"Total Score: {scores['total_score']:.2f}",
759
- "\nComponent Scores:",
760
- f"• Vector Similarity: {scores['vector_similarity']['score']:.2f} (40% weight)",
761
- f"• Content Match: {scores['content_match']['score']:.2f} (20% weight)",
762
- f"• Structural Relevance: {scores['structural_relevance']['score']:.2f} (20% weight)",
763
- f"• Context Completeness: {scores['context_completeness']['score']:.2f} (10% weight)",
764
- f"• Freshness: {scores['freshness']['score']:.2f} (10% weight)"
765
- ]
766
- return "\n".join(explanations)
767
-
768
- except Exception as e:
769
- print(f"Error generating score explanation: {str(e)}")
770
- return "Score explanation unavailable"
 
1
+ from typing import List, Dict, Union, Optional
2
  from groq import Groq
3
  import chromadb
4
  import os
 
18
  embeddings = self.model.encode(input)
19
  return embeddings.tolist()
20
 
21
+ class EnhancedXMLProcessor:
22
+ def __init__(self):
23
+ self.processed_nodes = set()
24
+ self.reference_map = {}
25
+ self.node_info = {}
26
+
27
+ def build_reference_map(self, root) -> None:
28
+ """Build a map of all node references for faster lookup"""
29
+ for element in root.findall('.//*'):
30
+ node_id = element.get('NodeId')
31
+ if node_id:
32
+ self.node_info[node_id] = {
33
+ 'tag': element.tag,
34
+ 'browse_name': element.get('BrowseName', ''),
35
+ 'display_name': self._get_display_name(element),
36
+ 'description': self._get_description(element),
37
+ 'data_type': element.get('DataType', ''),
38
+ 'references': []
39
+ }
40
+
41
+ refs = element.find('References')
42
+ if refs is not None:
43
+ for ref in refs.findall('Reference'):
44
+ ref_type = ref.get('ReferenceType')
45
+ is_forward = ref.get('IsForward', 'true') == 'true'
46
+ target = ref.text
47
+
48
+ if ref_type in ['HasComponent', 'HasProperty', 'HasTypeDefinition']:
49
+ self.reference_map.setdefault(node_id, []).append({
50
+ 'type': ref_type,
51
+ 'target': target,
52
+ 'is_forward': is_forward
53
+ })
54
+ self.node_info[node_id]['references'].append({
55
+ 'type': ref_type,
56
+ 'target': target,
57
+ 'is_forward': is_forward
58
+ })
59
+
60
+ def _get_display_name(self, element) -> str:
61
+ """Extract display name from element"""
62
+ display_name = element.find('DisplayName')
63
+ if display_name is not None:
64
+ return display_name.text
65
+ return ''
66
+
67
+ def _get_description(self, element) -> str:
68
+ """Extract description from element"""
69
+ desc = element.find('Description')
70
+ if desc is not None:
71
+ return desc.text
72
+ return ''
73
+
74
+ def generate_natural_language(self, node_id: str, depth: int = 0, visited: set = None) -> List[str]:
75
+ """Generate natural language description for a node and its children"""
76
+ if visited is None:
77
+ visited = set()
78
+
79
+ if node_id in visited:
80
+ return []
81
+
82
+ visited.add(node_id)
83
+ descriptions = []
84
+
85
+ node = self.node_info.get(node_id)
86
+ if not node:
87
+ return []
88
+
89
+ base_desc = self._build_base_description(node, depth)
90
+ if base_desc:
91
+ descriptions.append(base_desc)
92
+
93
+ if node_id in self.reference_map:
94
+ child_descriptions = self._process_forward_references(node_id, depth + 1, visited)
95
+ descriptions.extend(child_descriptions)
96
+
97
+ return descriptions
98
+
99
+ def _build_base_description(self, node: Dict, depth: int) -> str:
100
+ """Build the base description for a node"""
101
+ indentation = " " * depth
102
+ desc_parts = []
103
+
104
+ if node['browse_name']:
105
+ browse_name = node['browse_name'].split(':')[-1]
106
+ desc_parts.append(f"a {browse_name}")
107
+
108
+ if node['display_name']:
109
+ desc_parts.append(f"(displayed as '{node['display_name']}')")
110
+
111
+ if node['data_type']:
112
+ desc_parts.append(f"of type {node['data_type']}")
113
+
114
+ if node['description']:
115
+ desc_parts.append(f"which {node['description']}")
116
+
117
+ if desc_parts:
118
+ return f"{indentation}Contains {' '.join(desc_parts)}"
119
+ return ""
120
+
121
+ def _process_forward_references(self, node_id: str, depth: int, visited: set) -> List[str]:
122
+ """Process forward references to build hierarchical descriptions"""
123
+ descriptions = []
124
+
125
+ for ref in self.reference_map.get(node_id, []):
126
+ if ref['is_forward'] and ref['type'] in ['HasComponent', 'HasProperty']:
127
+ target_descriptions = self.generate_natural_language(ref['target'], depth, visited)
128
+ descriptions.extend(target_descriptions)
129
+
130
+ return descriptions
131
+
132
+ def generate_complete_description(self, root) -> str:
133
+ """Generate a complete natural language description of the XML structure"""
134
+ self.build_reference_map(root)
135
+ root_descriptions = []
136
+
137
+ for node_id in self.node_info:
138
+ is_root = True
139
+ for ref_list in self.reference_map.values():
140
+ for ref in ref_list:
141
+ if not ref['is_forward'] and ref['type'] == 'HasComponent' and ref['target'] == node_id:
142
+ is_root = False
143
+ break
144
+ if not is_root:
145
+ break
146
+
147
+ if is_root:
148
+ descriptions = self.generate_natural_language(node_id)
149
+ root_descriptions.extend(descriptions)
150
+
151
+ return "\n".join(root_descriptions)
152
+
153
  class UnifiedDocumentProcessor:
154
  def __init__(self, groq_api_key, collection_name="unified_content"):
155
  """Initialize the processor with necessary clients"""
 
157
 
158
  # XML-specific settings
159
  self.max_elements_per_chunk = 50
160
+ self.xml_processor = EnhancedXMLProcessor()
161
 
162
  # PDF-specific settings
163
  self.pdf_chunk_size = 500
 
185
  )
186
 
187
  def _initialize_nltk(self):
188
+ """Ensure NLTK's `punkt` tokenizer resource is available."""
189
  try:
190
+ nltk.data.find('tokenizers/punkt')
191
+ except LookupError:
192
+ print("Downloading NLTK 'punkt' tokenizer...")
193
  nltk.download('punkt')
194
+
195
+ def flatten_xml_to_text(self, element, depth=0) -> str:
196
+ """Convert XML to natural language using the enhanced processor"""
197
+ try:
198
+ return self.xml_processor.generate_complete_description(element)
199
  except Exception as e:
200
+ print(f"Error in enhanced XML processing: {str(e)}")
201
+ return self._original_flatten_xml_to_text(element, depth)
202
+
203
+ def _original_flatten_xml_to_text(self, element, depth=0) -> str:
204
+ """Original fallback XML flattening implementation"""
205
+ text_parts = []
 
206
 
207
+ element_info = f"Element: {element.tag}"
208
+ if element.attrib:
209
+ element_info += f", Attributes: {json.dumps(element.attrib)}"
210
+ if element.text and element.text.strip():
211
+ element_info += f", Text: {element.text.strip()}"
212
+ text_parts.append(element_info)
213
+
214
+ for child in element:
215
+ child_text = self._original_flatten_xml_to_text(child, depth + 1)
216
+ text_parts.append(child_text)
217
 
218
+ return "\n".join(text_parts)
219
 
220
  def extract_text_from_pdf(self, pdf_path: str) -> str:
221
  """Extract text from PDF file"""
 
231
 
232
  def chunk_text(self, text: str) -> List[str]:
233
  """Split text into chunks while preserving sentence boundaries"""
234
+ sentences = sent_tokenize(text)
 
 
 
 
 
235
  chunks = []
236
  current_chunk = []
237
  current_size = 0
 
258
 
259
  return chunks
260
 
261
+ def chunk_xml_text(self, text: str, max_chunk_size: int = 2000) -> List[str]:
262
+ """Split flattened XML text into manageable chunks"""
263
+ lines = text.split('\n')
264
+ chunks = []
265
+ current_chunk = []
266
+ current_size = 0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
267
 
268
+ for line in lines:
269
+ line_size = len(line)
270
+ if current_size + line_size > max_chunk_size and current_chunk:
271
+ chunks.append('\n'.join(current_chunk))
272
+ current_chunk = []
273
+ current_size = 0
274
+ current_chunk.append(line)
275
+ current_size += line_size
 
 
 
 
276
 
277
+ if current_chunk:
278
+ chunks.append('\n'.join(current_chunk))
 
 
 
279
 
280
+ return chunks
 
281
 
282
+ def generate_natural_language(self, content: Union[List[Dict], str], content_type: str) -> str:
283
+ """Generate natural language description with improved error handling and chunking"""
284
+ try:
285
+ if content_type == "xml":
286
+ prompt = f"Convert this XML structure description to a natural language summary that preserves the hierarchical relationships: {content}"
287
+ else: # pdf
288
+ prompt = f"Summarize this text while preserving key information: {content}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
 
290
+ max_prompt_length = 4000
291
+ if len(prompt) > max_prompt_length:
292
+ prompt = prompt[:max_prompt_length] + "..."
 
 
293
 
294
+ response = self.groq_client.chat.completions.create(
295
+ messages=[{"role": "user", "content": prompt}],
296
+ model="llama3-8b-8192",
297
+ max_tokens=1000
298
+ )
299
+ return response.choices[0].message.content
300
  except Exception as e:
301
+ print(f"Error generating natural language: {str(e)}")
302
+ if len(content) > 2000:
303
+ half_length = len(content) // 2
304
+ first_half = content[:half_length]
305
+ try:
306
+ return self.generate_natural_language(first_half, content_type)
307
+ except:
308
+ return None
309
+ return None
310
 
311
+ def store_in_vector_db(self, natural_language: str, metadata: Dict) -> str:
312
  """Store content in vector database"""
313
  doc_id = f"{metadata['source_file']}_{metadata['content_type']}_{metadata['chunk_id']}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}"
314
 
315
  self.collection.add(
316
+ documents=[natural_language],
317
  metadatas=[metadata],
318
  ids=[doc_id]
319
  )
320
 
321
  return doc_id
322
 
323
+ def process_file(self, file_path: str) -> Dict:
324
+ """Process any supported file type"""
325
  try:
326
+ file_extension = os.path.splitext(file_path)[1].lower()
 
 
 
 
 
 
 
 
 
 
 
 
327
 
328
+ if file_extension == '.xml':
329
+ return self.process_xml_file(file_path)
330
+ elif file_extension == '.pdf':
331
+ return self.process_pdf_file(file_path)
332
+ else:
333
+ return {
334
+ 'success': False,
335
+ 'error': f'Unsupported file type: {file_extension}'
336
+ }
337
+ except Exception as e:
338
  return {
339
+ 'success': False,
340
+ 'error': f'Error processing file: {str(e)}'
341
  }
 
 
 
342
 
343
+ def process_xml_file(self, xml_file_path: str) -> Dict:
344
+ """Process XML file with improved chunking"""
345
  try:
346
+ tree = ET.parse(xml_file_path)
347
+ root = tree.getroot()
348
+ flattened_text = self.flatten_xml_to_text(root)
349
+ chunks = self.chunk_xml_text(flattened_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
350
 
351
+ print(f"Split XML into {len(chunks)} chunks")
352
+ results = []
353
 
354
+ for i, chunk in enumerate(chunks):
355
+ print(f"Processing XML chunk {i+1}/{len(chunks)}")
356
+ try:
357
+ natural_language = self.generate_natural_language(chunk, "xml")
358
+
359
+ if natural_language:
360
+ metadata = {
361
+ 'source_file': os.path.basename(xml_file_path),
362
+ 'content_type': 'xml',
363
+ 'chunk_id': i,
364
+ 'total_chunks': len(chunks),
365
+ 'timestamp': str(datetime.datetime.now())
366
+ }
367
+ doc_id = self.store_in_vector_db(natural_language, metadata)
368
+ results.append({
369
+ 'chunk': i,
370
+ 'success': True,
371
+ 'doc_id': doc_id,
372
+ 'natural_language': natural_language
373
+ })
374
+ else:
375
+ results.append({
376
+ 'chunk': i,
377
+ 'success': False,
378
+ 'error': 'Failed to generate natural language'
379
+ })
380
+ except Exception as e:
381
+ print(f"Error processing chunk {i}: {str(e)}")
382
+ results.append({
383
+ 'chunk': i,
384
+ 'success': False,
385
+ 'error': str(e)
386
+ })
387
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
388
  return {
389
  'success': True,
390
+ 'total_chunks': len(chunks),
391
+ 'results': results
392
  }
393
+
394
  except Exception as e:
395
  return {
396
  'success': False,
397
  'error': str(e)
398
  }
399
 
400
+ def process_pdf_file(self, pdf_file_path: str) -> Dict:
401
+ """Process PDF file"""
402
+ try:
403
+ full_text = self.extract_text_from_pdf(pdf_file_path)
404
+ chunks = self.chunk_text(full_text)
 
 
 
405
 
406
+ print(f"Split PDF into {len(chunks)} chunks")
407
+ results = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
408
 
409
+ for i, chunk in enumerate(chunks):
410
+ print(f"Processing PDF chunk {i+1}/{len(chunks)}")
411
+ natural_language = self.generate_natural_language(chunk, "pdf")
 
 
 
 
 
 
 
412
 
413
+ if natural_language:
414
+ metadata = {
415
+ 'source_file': os.path.basename(pdf_file_path),
416
+ 'content_type': 'pdf',
417
+ 'chunk_id': i,
418
+ 'total_chunks': len(chunks),
419
+ 'timestamp': str(datetime.datetime.now()),
420
+ 'chunk_size': len(chunk.split())
421
+ }
422
+ doc_id = self.store_in_vector_db(natural_language, metadata)
423
+ results.append({
424
+ 'chunk': i,
425
+ 'success': True,
426
+ 'doc_id': doc_id,
427
+ 'natural_language': natural_language,
428
+ 'original_text': chunk[:200] + "..."
429
+ })
430
+ else:
431
+ results.append({
432
+ 'chunk': i,
433
+ 'success': False,
434
+ 'error': 'Failed to generate natural language summary'
435
+ })
436
 
437
+ return {
438
+ 'success': True,
439
+ 'total_chunks': len(chunks),
440
+ 'results': results
441
+ }
442
 
443
+ except Exception as e:
444
+ return {
445
+ 'success': False,
446
+ 'error': str(e)
447
+ }
448
 
449
+ def get_available_files(self) -> Dict[str, List[str]]:
450
+ """Get list of all files in the database"""
451
+ try:
452
+ all_entries = self.collection.get(
453
+ include=['metadatas']
454
+ )
455
 
456
+ files = {
457
+ 'pdf': set(),
458
+ 'xml': set()
459
+ }
 
460
 
461
+ for metadata in all_entries['metadatas']:
462
+ file_type = metadata['content_type']
463
+ file_name = metadata['source_file']
464
+ files[file_type].add(file_name)
 
 
 
 
465
 
466
+ return {
467
+ 'pdf': sorted(list(files['pdf'])),
468
+ 'xml': sorted(list(files['xml']))
469
+ }
470
+ except Exception as e:
471
+ print(f"Error getting available files: {str(e)}")
472
+ return {'pdf': [], 'xml': []}
 
 
 
 
 
 
 
 
473
 
474
+ def ask_question_selective(self, question: str, selected_files: List[str], n_results: int = 5) -> str:
475
+ """Ask a question using only the selected files"""
476
+ try:
477
+ filter_dict = {
478
+ 'source_file': {'$in': selected_files}
479
+ }
480
 
481
+ results = self.collection.query(
482
+ query_texts=[question],
483
+ n_results=n_results,
484
+ where=filter_dict,
485
+ include=["documents", "metadatas"]
486
+ )
 
 
487
 
488
+ if not results['documents'][0]:
489
+ return "No relevant content found in the selected files."
 
 
 
490
 
491
+ context = "\n\n".join(results['documents'][0])
 
 
 
 
 
492
 
493
+ prompt = f"""Based on the following content from the selected files, please answer this question: {question}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
494
 
495
+ Content:
496
+ {context}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
497
 
498
+ Please provide a direct answer based only on the information provided above."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
499
 
500
+ response = self.groq_client.chat.completions.create(
501
+ messages=[{"role": "user", "content": prompt}],
502
+ model="llama3-8b-8192",
503
+ temperature=0.2
504
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
505
 
506
+ return response.choices[0].message.content
507
+
508
+ except Exception as e:
509
+ return f"Error processing your question: {str(e)}"