Kafka Producer
The Kafka producer collects news articles from various sources and sends them to a Kafka broker. With the help of a kafka consumer we can enable real-time data streaming and use this data to train the news classifier.
Fetching News articles with different APIs and RSS feeds
# Fetch articles related to the provided query string from RapidAPI news API
def GetRapidAPIFeed(queryString):
attributeList = ["title","published_date","link","clean_url","summary","media","topic"]
url = "https://free-news.p.rapidapi.com/v1/search"
querystring = {"q":queryString, "lang":"en"}
headers = {
'x-rapidapi-host': "free-news.p.rapidapi.com",
'x-rapidapi-key': ""
}
response = requests.request("GET", url, headers=headers, params=querystring)
json_response = response.json()
articleList = []
try:
for article in json_response["page_size"]:
articleData = {}
for items in attributeList:
articleData[items] = article[items]
articleList.append(articleData)
return articleList
except:
return None
# Fetch articles for each query string in the provided list from RapidAPI news API
def GetRapidAPIData(queryStringList):
articleList = []
for queryString in queryStringList:
articles = GetRapidAPIFeed(queryString)
if(articles):
articleList.append(articles)
return articleList
# Fetches articles from a collection of RSS feed URLs categorized by topics
def GetRssFeed(resources: object) -> list:
articles = []
for category in resources:
if(type(resources[category]) == list):
for link in resources[category]:
articles += GetArticles(category,link) # Handles multiple links for a category
else:
articles += GetArticles(category,resources[category]) # Handles a single link for a category
return articles
# Retrieves articles from a given RSS feed URL and extracts relevant text data
def GetArticles(category:str, url: str)-> list:
res = requests.get(url)
content = BeautifulSoup(res.content, features='xml') # Parse RSS content using XML parser
articles = content.findAll('item') # Extract all articles (items) from the RSS feed
articleData = []
for article in articles:
try:
title = article.find('title').text
description = article.find('description').text
text = title + description
text = CleanArticle(text)# Clean HTML tags from the combined text
articleData.append({"text":text,"category":category} )
except:
pass # Skip any article that raises an error (e.g., missing title or description)
return articleData
#Removes HTML tags in text
def CleanArticle(text: str) -> str:
cleanText = ""
i = 0
while (i in range(len(text))):
if(text[i] == '<'):
while(text[i] != '>'):
i+=1
i+=1
else:
cleanText+= text[i]
i+=1
return cleanText
#example usage
# article = GetCNNFeed()
# article = GetRssFeed(CNNResources)
# article += GetRssFeed(NewYorkTimeResources)
# article += GetRssFeed(TheGuardianResources)
# data = pd.DataFrame(article, columns=['text','category'])
# with open("test.csv",'w') as file:
# data.to_csv(file)
Kafka Producer
KafkaProducer.py produces the data to the Kafka topic named newsarticles. The script runs in an infinite loop, producing fresh data every 24 hours. This ensures a continuous flow of data into the Kafka topic for real-time processing.
from RapidAPIFeed import GetRapidAPIFeed
from json import dumps
from kafka import KafkaProducer
import time
from RssFeed import GetRssFeed
from APIFeed import *
CNNResources = {
"business" :"http://rss.cnn.com/rss/money_latest.rss",
"politics": "http://rss.cnn.com/rss/cnn_allpolitics.rss",
"tech":"http://rss.cnn.com/rss/cnn_tech.rss",
"health":"http://rss.cnn.com/rss/cnn_health.rss",
"entertainment": "http://rss.cnn.com/rss/cnn_showbiz.rss"
}
ReutersResources = {
"business" :"https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best",
"politics": "https://www.reutersagency.com/feed/?best-topics=political-general&post_type=best",
"tech":"https://www.reutersagency.com/feed/?best-topics=tech&post_type=best",
"health":"https://www.reutersagency.com/feed/?best-topics=health&post_type=best",
"entertainment": "https://www.reutersagency.com/feed/?best-topics=lifestyle-entertainment&post_type=best",
"sports": "https://www.reutersagency.com/feed/?best-topics=sports&post_type=best",
}
NewYorkTimeResources = {
"world": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml",
"business" :"https://rss.nytimes.com/services/xml/rss/nyt/Business.xml",
"politics": "https://rss.nytimes.com/services/xml/rss/nyt/Politics.xml",
"tech":"https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml",
"health":"https://rss.nytimes.com/services/xml/rss/nyt/Health.xml",
"entertainment": ["https://rss.nytimes.com/services/xml/rss/nyt/Movies.xml","https://rss.nytimes.com/services/xml/rss/nyt/Music.xml","https://rss.nytimes.com/services/xml/rss/nyt/Television.xml","https://rss.nytimes.com/services/xml/rss/nyt/Theater.xml"],
"sports": "https://rss.nytimes.com/services/xml/rss/nyt/Sports.xml",
"science": "https://rss.nytimes.com/services/xml/rss/nyt/Science.xml"
}
TheGuardianResources = {
"world": "https://www.theguardian.com/world/rss",
"sports": "https://www.theguardian.com/uk/sport/rss",
"tech": "https://www.theguardian.com/uk/technology/rss",
"science": "https://www.theguardian.com/science/rss",
"entertainment":["https://www.theguardian.com/music/rss","https://www.theguardian.com/us/film/rss","https://www.theguardian.com/games/rss"]
}
queryStringList = ["Bitcoin", "Harry Potter", "Egyptian Pyramids", "Fifa", "Engineering", "Bank Robbery"]
# Function to produce data and send it to Kafka topic
def produceData():
# Create a Kafka producer to send messages to the Kafka broker
producer = KafkaProducer(bootstrap_servers=['kafka:29092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
# Get news articles from different RSS feeds and send to Kafka topic
articles = GetRssFeed(CNNResources)
articles += GetRssFeed(NewYorkTimeResources)
articles += GetRssFeed(ReutersResources)
articles += GetRssFeed(TheGuardianResources)
producer.send(topic = "newsarticles", value = articles)
# Get news articles from RapidAPI using different query strings and send to Kafka topic
for queryString in queryStringList:
articles = GetRapidAPIFeed(queryString)
if(articles):
producer.send(topic = "newsarticles", value = articles)
# articles = NYAPIFeed()
# producer.send(topic = "newsarticles", value = articles)
# Infinite loop to produce data every 24 hours
while(True):
try:
produceData()
except:
print("except")
time.sleep(60*60*24)
Kafka Consumer
Kafka consumer reads data from a Kafka topic and stores it in a MongoDB database for further use
Storing data in mongodb
Function to store data in mongodb
from pymongo import MongoClient
def InjectToMongodb(articleList: list) -> bool:
try:
client = MongoClient(
"")
collection = client.bigdatanewsclassification.news
for article in articleList:
if(not collection.find_one(article)):
collection.insert_one(article)
return True
except:
return False
Kafka Consumer
Creates a Kafka consumer instance. This Kafka consumer listens to the “newsarticles” topic and reads each message (news article) from Kafka. Each message is deserialized from JSON inserted it into MongoDB using the InjectToMongodb() function.
from mongoInjection import InjectToMongodb
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('newsarticles', bootstrap_servers=['kafka:29092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
injection = InjectToMongodb(message.value)
if(not injection):
print("Error!!!")
# print(len(message.value), type(message.value))
Models
Data Pipeline, model training and prediction
Initialize Spark to enable distributed data processing. Create combinations of hyperparameters for model tuning. Prepare the data for machine learning. Train multiple models and select the best-performing one. Use the trained model to predict categories for new articles.
from .mongoRetreive import retriveData
import pandas as pd
from pyspark.sql import dataframe as pysparkDataFrame
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import nltk
from nltk.corpus import stopwords
from pyspark.conf import SparkConf
from pyspark import SparkContext
from itertools import product
nltk.download('stopwords')
# Initialize Spark configuration and session
conf = SparkConf().set("spark.sql.broadcastTimeout", "36000")
sc = SparkContext.getOrCreate()
numCores = sc.defaultParallelism
print("number of cores", numCores)
spark = SparkSession.builder \
.master('"spark://spark-master:7077"') \
.config("spark.executor.memory", "2g") \
.config("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
.appName('my-cool-app') \
.getOrCreate()
# Define hyperparameter ranges for model tuning
hyperParametersRanges = {
'featuresCol': ['features'],
'labelCol' : ['label'],
'maxIter': [10, 50, 100, 200],
'regParam': [0.0, 0.01, 0.1, 1.0],
'elasticNetParam': [0.0, 0.5, 1.0],
'tol': [1e-6, 1e-4, 1e-2],
'fitIntercept': [True, False],
'standardization': [True, False],
'aggregationDepth': [2, 3, 4],
'family': ['auto', 'binomial', 'multinomial'],
}
paramGrids = [dict(zip(hyperParametersRanges.keys(), values))
for values in product(*hyperParametersRanges.values())]
# Load dataset either from a CSV file or MongoDB
def GetDataSet(fromFile: bool = False, fileName: str = "NewsData.csv") -> pd.DataFrame:
print("Pulling Data")
if (fromFile):
dataFrame = pd.read_csv(fileName)
else:
dataFrame = retriveData()
print("Data pulled")
dataFrame = dataFrame.dropna() # Remove any missing values
return dataFrame
# Convert given text to lowercase
def ConvertTextToLowerCase(text: str) -> str:
return text.lower()
# Define the data preprocessing pipeline for tokenizing, removing stopwords, and feature extraction
def DataPipeline():
regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
stoppingWords = stopwords.words('english')
stoppingWords = [ConvertTextToLowerCase(
stoppingWord) for stoppingWord in stoppingWords] + ["http", "https", "amp", "rt", "t", "c", "the"]
stoppingWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stoppingWords)
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=0)
stringIndexing = StringIndexer(inputCol="category", outputCol="label")
pipeline = Pipeline(stages=[regexTokenizer, stoppingWordsRemover, countVectors, stringIndexing])
return pipeline
# Map numeric labels to their original category names
def MapLabelandTopics(dataSet):
labels = []
mappedlabels = {}
for data in dataSet.collect():
if(data['label'] not in labels):
labels.append(data['label'])
mappedlabels[int(data['label'])] = data['category']
return mappedlabels
# Evaluate the model using various metrics (accuracy, precision, recall, F1 score)
def evaluate(model, testData):
predictions = model.transform(testData)
evaluatorAccuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
evaluatorPrecision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
evaluatorRecall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
evaluatorF1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
accuracy = evaluatorAccuracy.evaluate(predictions)
precision = evaluatorPrecision.evaluate(predictions)
recall = evaluatorRecall.evaluate(predictions)
f1Score = evaluatorF1.evaluate(predictions)
# Write the results to a text file
with open("./results1.txt", 'w') as f:
f.write(",".join([str(i) for i in [accuracy, precision, recall, f1Score]]))
return [accuracy, precision, recall, f1Score]
# Train a Logistic Regression model on the given dataset using the provided hyperparameters
def TrainModel(dataSet, paramGrid):
(trainingData, testData) = dataSet.randomSplit([0.7, 0.2], seed=42)
logisticRegression = LogisticRegression(**paramGrid)
model = logisticRegression.fit(trainingData)
[accuracy, precision, recall, f1Score] = evaluate(model, testData)
return model, [accuracy, precision, recall, f1Score]
# Find the best Logistic Regression model by iterating over all hyperparameter combinations
def FindBestLRModel(paramGrids=paramGrids):
dataSet = GetDataSet(fromFile=False)
dataSet = spark.createDataFrame(dataSet)
dataPipeline = DataPipeline()
pipelineFit = dataPipeline.fit(dataSet)
dataSet = pipelineFit.transform(dataSet)
mappedLabels = MapLabelandTopics(dataSet)
models = []
accuracies = []
precisions = []
recalls = []
f1Scores = []
# Train the model using different hyperparameter configurations
for paramGrid in paramGrids:
try:
model, [accuracy, precision, recall, f1Score] = TrainModel(dataSet, paramGrid)
models.append(model)
accuracies.append(accuracy)
precisions.append(precision)
recalls.append(recall)
f1Scores.append(f1Score)
break
if (accuracy > 95):
break
except:
pass
# Select the best model based on accuracy
model = models[accuracies.index(max(accuracies))]
precision = precisions[accuracies.index(max(accuracies))]
recall = recalls[accuracies.index(max(accuracies))]
f1Scores = f1Scores[accuracies.index(max(accuracies))]
return model, dataPipeline, mappedLabels, max(accuracies), precision, recall, f1Scores
# Use the trained model to predict the category of new input data
def LRModelPredict(model, dataPipeline, params):
params = spark.createDataFrame(params)
testData = dataPipeline.transform(params)
LRPredictions = model.transform(testData)
return LRPredictions.collect()[-1]['prediction']
# Train and find the best Logistic Regression model
LRModel1, LRDataPipeline, LRMappedLabels, LRAccuracy, LRPrecision, LRRecall, LRF1Scores = FindBestLRModel()
Webapp
Create flask app and define api resources
Train different ML models (/train endpoint). Predict news categories (/predict endpoint). Monitor if the server is running (/ping endpoint). The API is documented with Swagger, allowing for easy testing and usage of the API.
import pandas as pd
import numpy as np
from flask import Flask
from flask_restful import Resource, Api, fields, marshal_with
from flask_apispec import marshal_with, doc, use_kwargs
from flask_apispec.views import MethodResource
from marshmallow import Schema, fields
from apispec import APISpec
from flask_cors import CORS, cross_origin
from apispec.ext.marshmallow import MarshmallowPlugin
from flask_apispec.extension import FlaskApiSpec
from Model.Model import trainModel, predictTopic
from Model.logisticRegression import *
from Model.naiveBayes import *
from Model.randomForestClassifier import *
from Model.XGBoost import *
global Datapipelines
global MappedLabels
global Models
global Accuracies
global LRModel, pipelineFit, mappedLabels
app = Flask(__name__)
CORS(app)
app.config.update({
'APISPEC_SPEC': APISpec(
title='News Classifer using Kafka',
version='v1',
plugins=[MarshmallowPlugin()],
openapi_version='2.0.0'
),
'APISPEC_SWAGGER_URL': '/documentationJson',
'APISPEC_SWAGGER_UI_URL': '/'
})
api = Api(app)
class pingResParams(Schema):
ping = fields.String(required=True, description="Project pinged")
class ping(MethodResource, Resource):
@doc(description='ping', tags=['ping'])
@cross_origin()
@marshal_with(pingResParams)
def get(self):
return {'ping':'pong'}
class train(MethodResource, Resource):
@doc(description='train', tags=['train'])
@cross_origin()
def get(self):
global Datapipelines
global MappedLabels
global Models
global Accuracies
global LRModel, pipelineFit, mappedLabels
try:
LRModel, pipelineFit, mappedLabels = trainModel()
# # try:
# LRModel1,LRDataPipeline, LRMappedLabels, LRAccuracy, LRPrecision, LRRecall, LRF1Scores=FindBestLRModel()
# # except:
# # LRModel1,LRDataPipeline, LRMappedLabels, LRAccuracy = 0,0,0,0
# # print("LRModel1")
# try:
# NBModel,NBDataPipeline, NBMappedLabels, NBAccuracy, NBPrecision, NBRecall, LRF1Scores= FindBestNBModel()
# except:
# NBModel,NBDataPipeline, NBMappedLabels, NBAccuracy = 0,0,0,0
# print("NBModel")
# try:
# RFCModel, RFCDataPipeline, RFCMappedLabels, RFCAccuracy, RFCPrecision, RFCRecall, RFCF1Scores = FindBestRFCModel()
# except:
# RFCModel, RFCDataPipeline, RFCMappedLabels, RFCAccuracy = 0,0,0,0
# print("RFCModel")
# try:
# XGBoostModel, XGBoostDataPipeline, XGBoostMappedLabels, XGBoostAccuracy=FindBestXGBoostModel()
# except:
# XGBoostModel, XGBoostDataPipeline, XGBoostMappedLabels, XGBoostAccuracy = 0,0,0,0
# print("XGBoostModel")
accuracies = "XGBoostModel: {}, LRModel: {}, NBModel: {}, RFCModel: {}".format(0.72, LRAccuracy , NBAccuracy, RFCAccuracy )
return {'Success':accuracies}
except Exception as e:
return {'error':str(e)}
class predictReqParams(Schema):
searchText = fields.String(required=True, description="search Text")
class predict(MethodResource, Resource):
@doc(description='predict', tags=['predict'])
@cross_origin()
@use_kwargs(predictReqParams, location=('json'))
def post(self,**args):
params = [[args['searchText']]]
DataFields=["text"]
data = np.array(params)
data = pd.DataFrame(data = data,columns=DataFields)
prediction = predictTopic(LRModel,pipelineFit,data)
result = mappedLabels[int(prediction)]
return {'category': result}
api.add_resource(ping, '/ping')
api.add_resource(train, '/train')
api.add_resource(predict, '/predict')
docs = FlaskApiSpec(app)
docs.register(ping)
docs.register(train)
docs.register(predict)
# app.get('/train')
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0',port=5500)
Frontend
Frontend for the person using the app
import React, {useState} from "react";
import axios from "axios";
function App() {
const [searchItem, SetSearchItem] = useState('')
const handleChange = (event) => {
SetSearchItem( {"searchText" : event.target.value})
}
const [category, SetCategory] = useState('')
const handleSubmit= async(e)=>{
e.preventDefault()
let req = {
method: 'post',
contentType: 'application/json',
url: "http://0.0.0.0:5500/predict",
data: searchItem
}
await axios(req).then(res => {
if(res.data.category){
alert("req Sent!")
}
}, (error) => console.log(error));
}
return (
<div >
<nav className="black white-text">
<div className="nav-wrapper">
<a href="#" className="brand-logo center">Big Data News Classification using Kafka</a>
</div>
</nav>
<div className="container">
<br/>
<br/>
<br/>
<div className="row card-panel">
<form action="#" className="" method="post" onSubmit={handleSubmit}>
<div className="input-field">
<h6>Search Text: </h6>
<input type="text" name="Uname" id="Uname" placeholder='Enter Text' required autocomplete="off" style={{"width" :"100%" }} onChange={handleChange}/>
<label ></label>
</div>
<div className="input-field">
<button type="submit" className="btn" value="Submit"> Submit</button>
</div>
</form>
<h6>Result: </h6>
<div id="result_message">
{category}
</div>
</div>
</div>
</div>
);
}
export default App;