写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:455016
用户头像

发布了 43 篇内容, 共 28.7 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

架构实战训练营毕业总结

刘帅

开放报名 | 「RTC 进阶实战高手课」系列课程上线

融云 RongCloud

【ELT.ZIP】OpenHarmony啃论文俱乐部——云计算数据压缩方案

ELT.ZIP

云计算 OpenHarmony 数据压缩 ELT.ZIP

数据库连接池-Druid 源码学习(一)

wjchenge

初始化 Druid 源码、

【架构学习09】——电商秒杀系统

tiger

架构实战营

区块链将掀开人类的新时代

CECBC

RTC 系统音视频传输弱网对抗技术

融云 RongCloud

融云 x DSPORT:拿下游戏社交「实时社区」第一滴血

融云 RongCloud

druid源码学习一

Nick

源码 Druid

linux之history使用技巧

入门小站

Linux

微博评论高性能高可用架构设计

小虾米

设计模式之工厂模式

乌龟哥哥

5月月更

YUV数据分析

Loken

音视频 5月月更

【架构学习10】——毕业总结

tiger

架构实战营

Web3:创作者经济的黄金时代

CECBC

一文看懂Web3.0:元宇宙的基础设施,三大标签颠覆互联网

CECBC

开源之夏 2022 火热来袭!欢迎报名 OpenMLDB 社区项目~

第四范式开发者社区

人工智能 机器学习 数据库 开源 特征平台

【ELT.ZIP】OpenHarmony啃论文俱乐部——大数据框架性能优化系统

ELT.ZIP

大数据 OpenHarmony 压缩算法 ELT.ZIP

『Python』题集⒋

謓泽

Python 5月月更

在线TSV转多行数据工具

入门小站

工具

algorithm中的排序算法详解

工程师日月

算法 5月月更

Hoo网格量化策略 震荡市场中的投资利器

区块链前沿News

量化 Hoo 网格交易

druid源码阅读(一)整体概览

爱晒太阳的大白

5月月更

Druid连接池源码阅读01

石小天

在线2进制8进制10进制16进制进制转换工具

入门小站

工具

开源不易、安全慎行,中国软件如何走向文明?丨RTE 技术环境月报 202205

声网

开源 WebRTC RTE 编解码 技术环境月报

Go 语言入门很简单:Go 语言中操作 MySQL 数据库

宇宙之一粟

Go 语言 MySQL 数据库 5月月更

druid源码阅读1——获取连接与释放连接

张大彪

小红书持续打击炫富行为:自媒体行业不能违背公序良俗

石头IT视角

带你从0->1学习双指针算法

工程师日月

5月月更

kubernetes下的Nginx加Tomcat三部曲之三

程序员欣宸

Java Kubernetes 5月月更

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章