1+ from haystack .components .preprocessors import DocumentCleaner , DocumentSplitter
2+ from haystack .components .embedders import OpenAIDocumentEmbedder
3+ from haystack import Pipeline
4+ from haystack .components .writers import DocumentWriter
5+ from haystack .document_stores .types import DuplicatePolicy
6+ from haystack .utils import Secret
7+ from haystack_integrations .document_stores .elasticsearch import ElasticsearchDocumentStore
8+
9+
10+ from haystack import component , Document
11+ from typing import Any , Dict , List , Union
12+ from haystack .dataclasses import ByteStream
13+
14+ import json
15+ from dotenv import load_dotenv
16+ import os
17+
18+ import re
19+ from bs4 import BeautifulSoup
20+ from pathlib import Path
21+
22+ import logging
23+
24+ load_dotenv (".env" )
25+ open_ai_key = os .environ .get ("OPENAI_API_KEY" )
26+
27+ logging .basicConfig (level = logging .INFO )
28+ logger = logging .getLogger (__name__ )
29+
30+ def read_jsonl_file (file_path ):
31+ """
32+ Reads a JSONL (JSON Lines) file and returns a list of dictionaries representing each valid JSON object.
33+ Lines with JSON decoding errors are skipped.
34+
35+ :param file_path: The path to the JSONL file.
36+ :return: A list of dictionaries, each representing a parsed JSON object.
37+ """
38+ data = []
39+
40+ try :
41+ with open (file_path , 'r' ) as file :
42+ for line in file :
43+ try :
44+ # Attempt to load the JSON data from the current line
45+ json_data = json .loads (line )
46+ data .append (json_data )
47+ except json .JSONDecodeError as e :
48+ # Log an error message for any lines that can't be decoded
49+ logger .error (f"Error decoding JSON on line: { line [:30 ]} ... - { e } " )
50+ except FileNotFoundError as e :
51+ logger .error (f"File not found: { e } " )
52+
53+ return data
54+
55+
56+ @component
57+ class BenzingaNews :
58+
59+ @component .output_types (documents = List [Document ])
60+ def run (self , sources : Dict [str , Any ]) -> None :
61+ logger .info ("Starting BenzingaNews.run with sources" )
62+ documents = []
63+ try :
64+ for source in sources :
65+ logger .debug (f"Processing source: { source .get ('headline' , 'Unknown headline' )} " )
66+ for key in source :
67+ if isinstance (source [key ], str ):
68+ source [key ] = self .clean_text (source [key ])
69+
70+ if source ['content' ] == "" :
71+ logger .warning (f"Skipping source due to empty content: { source .get ('headline' , 'Unknown headline' )} " )
72+ continue
73+
74+ # Create a Document with the cleaned content and metadata
75+ content = source ['content' ]
76+ document = Document (content = content , meta = source )
77+ documents .append (document )
78+
79+ logger .info (f"Successfully processed { len (documents )} documents." )
80+
81+ except Exception as e :
82+ logger .error (f"Error during BenzingaNews.run: { e } " )
83+
84+ return {"documents" : documents }
85+
86+ def clean_text (self , text ):
87+ logger .debug ("Cleaning text content." )
88+ try :
89+ # Remove HTML tags using BeautifulSoup
90+ soup = BeautifulSoup (text , "html.parser" )
91+ text = soup .get_text ()
92+ # Remove extra whitespace
93+ text = re .sub (r'\s+' , ' ' , text ).strip ()
94+ logger .debug ("Text cleaned successfully." )
95+ except Exception as e :
96+ logger .error (f"Error during text cleaning: { e } " )
97+ raise
98+ return text
99+
100+
101+ @component
102+ class BenzingaEmbeder :
103+
104+ def __init__ (self ):
105+ logger .info ("Initializing BenzingaEmbeder pipeline." )
106+ try :
107+ get_news = BenzingaNews ()
108+ document_store = ElasticsearchDocumentStore (embedding_similarity_function = "cosine" , hosts = "http://localhost:9200" )
109+ document_cleaner = DocumentCleaner (
110+ remove_empty_lines = True ,
111+ remove_extra_whitespaces = True ,
112+ remove_repeated_substrings = False
113+ )
114+ document_splitter = DocumentSplitter (split_by = "passage" , split_length = 5 )
115+ document_writer = DocumentWriter (document_store = document_store ,
116+ policy = DuplicatePolicy .OVERWRITE )
117+ embedding = OpenAIDocumentEmbedder (api_key = Secret .from_token (open_ai_key ))
118+
119+ self .pipeline = Pipeline ()
120+ self .pipeline .add_component ("get_news" , get_news )
121+ self .pipeline .add_component ("document_cleaner" , document_cleaner )
122+ self .pipeline .add_component ("document_splitter" , document_splitter )
123+ self .pipeline .add_component ("embedding" , embedding )
124+ self .pipeline .add_component ("document_writer" , document_writer )
125+
126+ self .pipeline .connect ("get_news" , "document_cleaner" )
127+ self .pipeline .connect ("document_cleaner" , "document_splitter" )
128+ self .pipeline .connect ("document_splitter" , "embedding" )
129+ self .pipeline .connect ("embedding" , "document_writer" )
130+
131+ logger .info ("Pipeline initialized successfully." )
132+ except Exception as e :
133+ logger .error (f"Error during BenzingaEmbeder initialization: { e } " )
134+ raise
135+
136+ @component .output_types (documents = List [Document ])
137+ def run (self , event : List [Union [str , Path , ByteStream ]]):
138+ logger .info (f"Running BenzingaEmbeder with event: { event } " )
139+ try :
140+ documents = self .pipeline .run ({"get_news" : {"sources" : [event ]}})
141+ self .pipeline .draw ("benzinga_pipeline.png" )
142+ logger .info ("Pipeline executed successfully, drawing pipeline graph." )
143+ return documents
144+ except Exception as e :
145+ logger .error (f"Error during pipeline execution: { e } " )
146+ raise
0 commit comments