1) Create a directory for twitter streams
cd /usr/lib/spark
sudo mkdir tweets
cd tweetscd sudo mkdir data
sudo mkdir training
sudo chmod 777 /usr/lib/spark/tweets/
These are the two folders which we would be using in this project
data :Would contain the master of the csv files which we would pretend coming from a training source.
training : Source to train our machine learning algorithm
2) Download twitter data
sudo wget https://raw.githubusercontent.com/chimpler/tweet-heatmap/master/tweets_tech.csv
3) split the files
sudo split tweets_tech.csv
4) Move all the split files to data directory
sudo mv /usr/lib/spark/tweets/x* /usr/lib/spark/tweets/data/
5) Create a spark streaming file
sudo vi streaming_ml.py
import sys import sched, time from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.mllib.linalg import Vectors from pyspark.mllib.clustering import StreamingKMeans if __name__ == "__main__": sc = SparkContext(appName="StreamingKMeansClustering") ssc = StreamingContext(sc, 10) ssc.checkpoint("file:///tmp/spark") def parseTrainingData(line): cells = line.split(",") return Vectors.dense([float(cells[0]), float(cells[1])]) trainingStream = ssc.textFileStream("file:///Users/jananiravi/spark/spark-2.1.0-bin-without-hadoop/tweets/training")\ .map(parseTrainingData) trainingStream.pprint(); model = StreamingKMeans(k=2, decayFactor=1.0).setRandomCenters(2, 1.0, 0) print("Initial centers: " + str(model.latestModel().centers)) model.trainOn(trainingStream) ssc.start() s = sched.scheduler(time.time, time.sleep) def print_cluster_centers(sc, model): print("Cluster centers: " + str(model.latestModel().centers)) s.enter(10, 1, print_cluster_centers, (sc, model)) s.enter(10, 1, print_cluster_centers, (s, model)) s.run() ssc.awaitTermination()
6) submit spark job
spark-submit streaming_ml.py
Job job would start listening to the streaming cluster
****
Note:By default spark-submit might not work and would throw exception as shown below
"ImportError: No module named numpy".
Solution : Install pip and then numpy
1) #CD to pyspark directory. You would find the path where error is thrown.
#The path shown below is for my sandbox /usr/hdp/current/spark-client/python/lib/pyspark 2) Install pip yum install python-pip 3) install numpy pip install numpy 4) unpgrade pip (This step might not be needed). pip install --upgrade pip
Spark streaming would be listening for the text stream as shown below.
7) Move data file from data folder to training folder
sudo mv /usr/lib/spark/tweets/data/xaa /usr/lib/spark/twee ts/training/
8) Before moving the file we would see the initial centers
Note:
1) While streaming any existing files in the directory would be ignored.only new files would be considered when we set the decay factor = 0.It would consider previous data when decay factor=1
2) While the new files arrives the cluster centers would be updated .The K means algorithm has calculated the cluster centers for where the location tweets are concentrated
9) Initial locations before the first file is moved to the training folder
[ 1.76405235 0.40015721 ]
[ 0.97873798 0.40015721 ]
DECAY FACTOR =1
When decay factor is set to 1, it would consider all the data from initial to the resent data.
We can see these locations using google maps as shown below
When we zoom out, the locations are as shown below. What we see over here is the tweet location is concentrated near Nigeria
10) Move the first file to the training folder.
cp /usr/lib/spark/tweets/data/xaa /usr/lib/spark/tweets/training/
Co-ordinates for the 1st cluster
[22.79589238, -79.61550025]
Location after the first file is moved. It is somewhere between Florida and cuba for the cluster center
Co-ordinates for the 2nd cluster
[10.03386625, 103.83779637]
11) Now move the 2nd file to the training folder.
The co-ordinates would change as shown below.
cp /usr/lib/spark/tweets/data/xab /usr/lib/spark/tweets/training/
Finally after training with all the clusters we get the clustered center as shown below.
As location moved north cluster center also keep moving as shown below.
DECAY FACTOR =0
When we set the decay factor to 0.0 It would only consider the recent batch of data.
12) Delete all the files from the data directory.
rm -r x*
13)create a new streaming_ml.py file with decay factor as 0
14) Repeat step 10 and 11 and now we see that co-ordinates would move very far north as shown below
link to github
Local residents can even be prosecuted when half in} overseas so that these are only reputable on-line casinos Korean players 카지노사이트 can safely gamble at. Being an area on-line gambling authority Korea National Gambling Control Commission monitors the implementations of those laws. Everygame has partnered with RealTime Gaming and other top game suppliers to offer slots apps that pay real cash.
ReplyDelete