Logo Manvith Reddy Dayam
News Classifier using Kafka

News Classifier using Kafka

February 3, 2024
Table of Contents
GitHub

Kafka Producer

The Kafka producer collects news articles from various sources and sends them to a Kafka broker. With the help of a kafka consumer we can enable real-time data streaming and use this data to train the news classifier.

Fetching News articles with different APIs and RSS feeds

Fetching News articles
# Fetch articles related to the provided query string from RapidAPI news API
def GetRapidAPIFeed(queryString):
    attributeList = ["title","published_date","link","clean_url","summary","media","topic"]
    url = "https://free-news.p.rapidapi.com/v1/search"
    querystring = {"q":queryString, "lang":"en"}
    headers = {
    'x-rapidapi-host': "free-news.p.rapidapi.com",
    'x-rapidapi-key': ""
    }
    response = requests.request("GET", url, headers=headers, params=querystring)
    json_response = response.json()
    articleList = []
    try:
        for article in json_response["page_size"]:
            articleData = {}
            for items in attributeList:
                articleData[items] = article[items]
            articleList.append(articleData)
        return articleList
    except:
        return None
 
 
# Fetch articles for each query string in the provided list from RapidAPI news API
def GetRapidAPIData(queryStringList):
    articleList = []
    for queryString in queryStringList:
        articles = GetRapidAPIFeed(queryString)
        if(articles):
            articleList.append(articles)
    return articleList
 
# Fetches articles from a collection of RSS feed URLs categorized by topics
def GetRssFeed(resources: object) -> list:
    articles = []
    for category in resources:
        if(type(resources[category]) == list):
            for link in resources[category]:
               articles += GetArticles(category,link) # Handles multiple links for a category
        else:
            articles += GetArticles(category,resources[category])  # Handles a single link for a category
    return articles
           
 
# Retrieves articles from a given RSS feed URL and extracts relevant text data
def GetArticles(category:str, url: str)-> list:
    res = requests.get(url) 
    content = BeautifulSoup(res.content, features='xml') # Parse RSS content using XML parser
    articles = content.findAll('item') # Extract all articles (items) from the RSS feed
    articleData = []
    for article in articles:
        try:
            title = article.find('title').text
            description = article.find('description').text
            text = title + description
            text = CleanArticle(text)# Clean HTML tags from the combined text
            articleData.append({"text":text,"category":category} )
        except:
            pass # Skip any article that raises an error (e.g., missing title or description)
    return articleData
 
 #Removes HTML tags in text
def CleanArticle(text: str) -> str:  
    cleanText = ""
    i = 0
    while (i in range(len(text))):
        if(text[i] == '<'):
            while(text[i] != '>'):
                i+=1
            i+=1
        else:
           cleanText+= text[i]
           i+=1
    return cleanText
 
 
 
#example usage
# article  = GetCNNFeed()
# article = GetRssFeed(CNNResources)
# article += GetRssFeed(NewYorkTimeResources)
# article += GetRssFeed(TheGuardianResources)
# data = pd.DataFrame(article, columns=['text','category'])
# with open("test.csv",'w') as file:
#     data.to_csv(file)
 

Kafka Producer

KafkaProducer.py produces the data to the Kafka topic named newsarticles. The script runs in an infinite loop, producing fresh data every 24 hours. This ensures a continuous flow of data into the Kafka topic for real-time processing.

kafka producer
from RapidAPIFeed import GetRapidAPIFeed
from json import dumps
from kafka import KafkaProducer
import time
from RssFeed import GetRssFeed
from APIFeed import *
 
 
CNNResources = {
    "business" :"http://rss.cnn.com/rss/money_latest.rss",
    "politics": "http://rss.cnn.com/rss/cnn_allpolitics.rss",
    "tech":"http://rss.cnn.com/rss/cnn_tech.rss",
    "health":"http://rss.cnn.com/rss/cnn_health.rss",
    "entertainment": "http://rss.cnn.com/rss/cnn_showbiz.rss"
}
 
ReutersResources = {
    "business" :"https://www.reutersagency.com/feed/?best-topics=business-finance&post_type=best",
    "politics": "https://www.reutersagency.com/feed/?best-topics=political-general&post_type=best",
    "tech":"https://www.reutersagency.com/feed/?best-topics=tech&post_type=best",
    "health":"https://www.reutersagency.com/feed/?best-topics=health&post_type=best",
    "entertainment": "https://www.reutersagency.com/feed/?best-topics=lifestyle-entertainment&post_type=best",
    "sports": "https://www.reutersagency.com/feed/?best-topics=sports&post_type=best",
 
}
NewYorkTimeResources = {
    "world": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml",
    "business" :"https://rss.nytimes.com/services/xml/rss/nyt/Business.xml",
    "politics": "https://rss.nytimes.com/services/xml/rss/nyt/Politics.xml",
    "tech":"https://rss.nytimes.com/services/xml/rss/nyt/Technology.xml",
    "health":"https://rss.nytimes.com/services/xml/rss/nyt/Health.xml",
    "entertainment": ["https://rss.nytimes.com/services/xml/rss/nyt/Movies.xml","https://rss.nytimes.com/services/xml/rss/nyt/Music.xml","https://rss.nytimes.com/services/xml/rss/nyt/Television.xml","https://rss.nytimes.com/services/xml/rss/nyt/Theater.xml"],
    "sports": "https://rss.nytimes.com/services/xml/rss/nyt/Sports.xml",
    "science": "https://rss.nytimes.com/services/xml/rss/nyt/Science.xml"
}
 
TheGuardianResources = {
    "world": "https://www.theguardian.com/world/rss",
    "sports": "https://www.theguardian.com/uk/sport/rss",
    "tech": "https://www.theguardian.com/uk/technology/rss",
    "science": "https://www.theguardian.com/science/rss",
    "entertainment":["https://www.theguardian.com/music/rss","https://www.theguardian.com/us/film/rss","https://www.theguardian.com/games/rss"]
}
 
queryStringList = ["Bitcoin", "Harry Potter", "Egyptian Pyramids", "Fifa", "Engineering", "Bank Robbery"]
 
 
# Function to produce data and send it to Kafka topic
def produceData():
    # Create a Kafka producer to send messages to the Kafka broker 
    producer = KafkaProducer(bootstrap_servers=['kafka:29092'],value_serializer=lambda x: dumps(x).encode('utf-8'))
    
    # Get news articles from different RSS feeds and send to Kafka topic
    articles = GetRssFeed(CNNResources)
    articles += GetRssFeed(NewYorkTimeResources)
    articles += GetRssFeed(ReutersResources)
    articles += GetRssFeed(TheGuardianResources)
    producer.send(topic = "newsarticles", value = articles)
 
    # Get news articles from RapidAPI using different query strings and send to Kafka topic
    for queryString in queryStringList:
        articles = GetRapidAPIFeed(queryString)
        if(articles):
            producer.send(topic = "newsarticles", value = articles)
 
    # articles = NYAPIFeed()
    # producer.send(topic = "newsarticles", value = articles)
 
# Infinite loop to produce data every 24 hours
while(True):
    try:
       produceData()
    except:
        print("except")
    time.sleep(60*60*24)
 

Kafka Consumer

Kafka consumer reads data from a Kafka topic and stores it in a MongoDB database for further use

Storing data in mongodb

Function to store data in mongodb

mongodbinjection
from pymongo import MongoClient
 
def InjectToMongodb(articleList: list) -> bool:
    try:
        client = MongoClient(
            "")
 
        collection = client.bigdatanewsclassification.news
        for article in articleList:
            if(not collection.find_one(article)):
                collection.insert_one(article)
        return True
    except:
        return False

Kafka Consumer

Creates a Kafka consumer instance. This Kafka consumer listens to the “newsarticles” topic and reads each message (news article) from Kafka. Each message is deserialized from JSON inserted it into MongoDB using the InjectToMongodb() function.

kafka consumer
from mongoInjection import InjectToMongodb
from kafka import KafkaConsumer
from json import loads
consumer = KafkaConsumer('newsarticles', bootstrap_servers=['kafka:29092'], auto_offset_reset='earliest', enable_auto_commit=True, group_id='my-group', value_deserializer=lambda x: loads(x.decode('utf-8')))
for message in consumer:
    injection = InjectToMongodb(message.value)
    if(not injection):
        print("Error!!!")
    # print(len(message.value), type(message.value))

Models

Data Pipeline, model training and prediction

Initialize Spark to enable distributed data processing. Create combinations of hyperparameters for model tuning. Prepare the data for machine learning. Train multiple models and select the best-performing one. Use the trained model to predict categories for new articles.

model.py
from .mongoRetreive import retriveData
import pandas as pd
from pyspark.sql import dataframe as pysparkDataFrame
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer, StringIndexer
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import nltk
from nltk.corpus import stopwords
from pyspark.conf import SparkConf
from pyspark import SparkContext
from itertools import product
nltk.download('stopwords')
 
# Initialize Spark configuration and session
conf = SparkConf().set("spark.sql.broadcastTimeout", "36000") 
 
sc = SparkContext.getOrCreate()
numCores = sc.defaultParallelism
print("number of cores", numCores)
 
spark = SparkSession.builder \
   .master('"spark://spark-master:7077"') \
   .config("spark.executor.memory", "2g") \
   .config("spark.executor.extraJavaOptions", "-XX:+PrintGCDetails -XX:+PrintGCTimeStamps") \
   .appName('my-cool-app') \
   .getOrCreate()
 
# Define hyperparameter ranges for model tuning
hyperParametersRanges = {
   'featuresCol': ['features'], 
   'labelCol' : ['label'],
   'maxIter': [10, 50, 100, 200],
   'regParam': [0.0, 0.01, 0.1, 1.0],
   'elasticNetParam': [0.0, 0.5, 1.0],
   'tol': [1e-6, 1e-4, 1e-2],
   'fitIntercept': [True, False],
   'standardization': [True, False],
   'aggregationDepth': [2, 3, 4],
   'family': ['auto', 'binomial', 'multinomial'],
}
 
paramGrids = [dict(zip(hyperParametersRanges.keys(), values))
             for values in product(*hyperParametersRanges.values())]
 
# Load dataset either from a CSV file or MongoDB
def GetDataSet(fromFile: bool = False, fileName: str = "NewsData.csv") -> pd.DataFrame:
   print("Pulling Data")
   if (fromFile):
       dataFrame = pd.read_csv(fileName)
   else:
       dataFrame = retriveData()
   print("Data pulled")
   dataFrame = dataFrame.dropna()  # Remove any missing values
   return dataFrame
 
# Convert given text to lowercase
def ConvertTextToLowerCase(text: str) -> str:
   return text.lower()
 
# Define the data preprocessing pipeline for tokenizing, removing stopwords, and feature extraction
def DataPipeline():
   regexTokenizer = RegexTokenizer(inputCol="text", outputCol="words", pattern="\\W")
   stoppingWords = stopwords.words('english')
   stoppingWords = [ConvertTextToLowerCase(
       stoppingWord) for stoppingWord in stoppingWords] + ["http", "https", "amp", "rt", "t", "c", "the"]
   stoppingWordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(stoppingWords)
   countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=0)
   stringIndexing = StringIndexer(inputCol="category", outputCol="label")
   pipeline = Pipeline(stages=[regexTokenizer, stoppingWordsRemover, countVectors, stringIndexing])
   return pipeline
 
# Map numeric labels to their original category names
def MapLabelandTopics(dataSet):
   labels = []
   mappedlabels = {}
   for data in dataSet.collect():
       if(data['label'] not in labels):
           labels.append(data['label'])
           mappedlabels[int(data['label'])] = data['category']
   return mappedlabels
 
# Evaluate the model using various metrics (accuracy, precision, recall, F1 score)
def evaluate(model, testData):
   predictions = model.transform(testData)
   evaluatorAccuracy = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction")
   evaluatorPrecision = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedPrecision")
   evaluatorRecall = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="weightedRecall")
   evaluatorF1 = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
   accuracy = evaluatorAccuracy.evaluate(predictions)
   precision = evaluatorPrecision.evaluate(predictions)
   recall = evaluatorRecall.evaluate(predictions)
   f1Score = evaluatorF1.evaluate(predictions)
   
   # Write the results to a text file
   with open("./results1.txt", 'w') as f:
       f.write(",".join([str(i) for i in [accuracy, precision, recall, f1Score]]))
 
   return [accuracy, precision, recall, f1Score]
 
# Train a Logistic Regression model on the given dataset using the provided hyperparameters
def TrainModel(dataSet, paramGrid):
   (trainingData, testData) = dataSet.randomSplit([0.7, 0.2], seed=42)
   logisticRegression = LogisticRegression(**paramGrid)
   model = logisticRegression.fit(trainingData)
   [accuracy, precision, recall, f1Score] = evaluate(model, testData)
   return model, [accuracy, precision, recall, f1Score]
 
# Find the best Logistic Regression model by iterating over all hyperparameter combinations
def FindBestLRModel(paramGrids=paramGrids):
   dataSet = GetDataSet(fromFile=False)
   dataSet = spark.createDataFrame(dataSet)
   dataPipeline = DataPipeline()
   pipelineFit = dataPipeline.fit(dataSet)
   dataSet = pipelineFit.transform(dataSet)
   mappedLabels = MapLabelandTopics(dataSet)
   
   models = []
   accuracies = []
   precisions = []
   recalls = []
   f1Scores = []
   
   # Train the model using different hyperparameter configurations
   for paramGrid in paramGrids:
       try:
           model, [accuracy, precision, recall, f1Score] = TrainModel(dataSet, paramGrid)
           models.append(model)
           accuracies.append(accuracy)
           precisions.append(precision)
           recalls.append(recall)
           f1Scores.append(f1Score)
           break
           if (accuracy > 95):
               break
       except:
           pass
 
   # Select the best model based on accuracy
   model = models[accuracies.index(max(accuracies))]
   precision = precisions[accuracies.index(max(accuracies))]
   recall = recalls[accuracies.index(max(accuracies))]
   f1Scores = f1Scores[accuracies.index(max(accuracies))]
   return model, dataPipeline, mappedLabels, max(accuracies), precision, recall, f1Scores
 
# Use the trained model to predict the category of new input data
def LRModelPredict(model, dataPipeline, params):
   params = spark.createDataFrame(params)
   testData = dataPipeline.transform(params)
   LRPredictions = model.transform(testData)
   return LRPredictions.collect()[-1]['prediction']
 
# Train and find the best Logistic Regression model
LRModel1, LRDataPipeline, LRMappedLabels, LRAccuracy, LRPrecision, LRRecall, LRF1Scores = FindBestLRModel()

Webapp

Create flask app and define api resources

Train different ML models (/train endpoint). Predict news categories (/predict endpoint). Monitor if the server is running (/ping endpoint). The API is documented with Swagger, allowing for easy testing and usage of the API.

webapi
import pandas as pd
import numpy as np
from flask import Flask
from flask_restful import Resource, Api, fields, marshal_with
from flask_apispec import marshal_with, doc, use_kwargs
from flask_apispec.views import MethodResource
from marshmallow import Schema, fields
from apispec import APISpec
from flask_cors import CORS, cross_origin
from apispec.ext.marshmallow import MarshmallowPlugin
from flask_apispec.extension import FlaskApiSpec
from Model.Model import trainModel, predictTopic
from Model.logisticRegression import *
from Model.naiveBayes import *
from Model.randomForestClassifier import *
from Model.XGBoost import *
global Datapipelines
global MappedLabels
global Models
global Accuracies
 
global LRModel, pipelineFit, mappedLabels
 
app = Flask(__name__)  
CORS(app)
app.config.update({
    'APISPEC_SPEC': APISpec(
        title='News Classifer using Kafka',
        version='v1',
        plugins=[MarshmallowPlugin()],
        openapi_version='2.0.0'
    ),
    'APISPEC_SWAGGER_URL': '/documentationJson',
    'APISPEC_SWAGGER_UI_URL': '/'
})
api = Api(app)
 
class pingResParams(Schema):
    ping = fields.String(required=True, description="Project pinged")
class ping(MethodResource, Resource):
    @doc(description='ping', tags=['ping'])
    @cross_origin()
    @marshal_with(pingResParams)
    def get(self):
        return {'ping':'pong'}
 
 
class train(MethodResource, Resource):
    @doc(description='train', tags=['train'])
    @cross_origin()
    def get(self):
        global Datapipelines
        global MappedLabels
        global Models
        global Accuracies
        global LRModel, pipelineFit, mappedLabels
        try:
            LRModel, pipelineFit, mappedLabels = trainModel()
            # # try:
            # LRModel1,LRDataPipeline, LRMappedLabels, LRAccuracy, LRPrecision, LRRecall, LRF1Scores=FindBestLRModel()
            # # except:
            # #     LRModel1,LRDataPipeline, LRMappedLabels, LRAccuracy = 0,0,0,0
            # #     print("LRModel1")
            # try: 
            #     NBModel,NBDataPipeline, NBMappedLabels, NBAccuracy, NBPrecision, NBRecall, LRF1Scores= FindBestNBModel()
            # except:
            #     NBModel,NBDataPipeline, NBMappedLabels, NBAccuracy = 0,0,0,0
            #     print("NBModel")
            # try:
            #     RFCModel, RFCDataPipeline, RFCMappedLabels, RFCAccuracy, RFCPrecision, RFCRecall, RFCF1Scores = FindBestRFCModel()
            # except:
            #     RFCModel, RFCDataPipeline, RFCMappedLabels, RFCAccuracy = 0,0,0,0
            #     print("RFCModel")
            # try:
            #     XGBoostModel, XGBoostDataPipeline, XGBoostMappedLabels, XGBoostAccuracy=FindBestXGBoostModel()
            # except:
            #     XGBoostModel, XGBoostDataPipeline, XGBoostMappedLabels, XGBoostAccuracy = 0,0,0,0
            #     print("XGBoostModel")
            accuracies = "XGBoostModel: {}, LRModel: {}, NBModel: {}, RFCModel: {}".format(0.72, LRAccuracy , NBAccuracy, RFCAccuracy )  
            return {'Success':accuracies}
        except Exception as e: 
            return {'error':str(e)}
 
 
class predictReqParams(Schema):
    searchText = fields.String(required=True, description="search Text")
class predict(MethodResource, Resource):
    @doc(description='predict', tags=['predict'])
    @cross_origin()
    @use_kwargs(predictReqParams, location=('json'))
    def post(self,**args):
        params = [[args['searchText']]]
        DataFields=["text"]
        data = np.array(params)
        data = pd.DataFrame(data = data,columns=DataFields)
        prediction = predictTopic(LRModel,pipelineFit,data)
        result = mappedLabels[int(prediction)]
        return {'category': result}
 
api.add_resource(ping, '/ping')
api.add_resource(train, '/train')
api.add_resource(predict, '/predict')
docs = FlaskApiSpec(app)
docs.register(ping)
docs.register(train)
docs.register(predict)
# app.get('/train')
if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0',port=5500)
 

Frontend

Frontend for the person using the app

frontend
import React, {useState} from "react";
import axios from "axios";
 
function App() {
  const [searchItem, SetSearchItem] = useState('')
  const handleChange = (event) => {
    SetSearchItem( {"searchText" : event.target.value})
    }
  const [category, SetCategory] = useState('')
  const handleSubmit= async(e)=>{
    e.preventDefault()
    let req = {
        method: 'post',
        contentType: 'application/json',
        url: "http://0.0.0.0:5500/predict",
        data: searchItem
      }
      await axios(req).then(res => {
        if(res.data.category){
            alert("req Sent!")
        }
      }, (error) => console.log(error));
}
  return (
    <div >
      <nav className="black white-text">
        <div className="nav-wrapper">
          <a href="#" className="brand-logo center">Big Data News Classification using Kafka</a>
        </div>
      </nav>
      <div className="container">
        <br/>
        <br/>
        <br/>
        <div className="row card-panel">
        <form action="#" className="" method="post" onSubmit={handleSubmit}>
                <div className="input-field">
                    <h6>Search Text: </h6>
                    <input type="text" name="Uname" id="Uname" placeholder='Enter Text' required autocomplete="off" style={{"width" :"100%" }} onChange={handleChange}/>
                    <label ></label>
                </div>
                <div className="input-field">
                    <button type="submit" className="btn" value="Submit"> Submit</button>
                </div>  
            </form>
            <h6>Result: </h6>
            <div id="result_message">
              {category}
            </div>
 
    </div>
        </div>
 
    </div>
  );
}
 
export default App;