写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:454952
用户头像

发布了 43 篇内容, 共 28.4 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

基于springcloud springboot vue elementui商城源码毕设实战

清风

Vue Java 分布式 毕业设计

CODING DevOps 高级架构师王炜入选木兰开源社区首批导师

CODING DevOps

DevOps Nocalhost 木兰开源社区 开发者生态

破解AI开课难题!2021 全国人工智能师资培训落地厦门大学

百度大脑

人工智能

高可用架构(下)

编号94530

数据库 架构设计 异地多活容灾 高可用架构

Go- 常量

HelloBug

常量 const Go 语言

还在死磕 Ajax?那可就 out 了!

编程三昧

JavaScript 大前端 8月日更 Fetch

python3学习笔记-20210817(变量名与字符串)

姬翔

OceanBase源码解读(二):SQL的一生

OceanBase 数据库

数据库 分布式数据库 oceanbase OceanBase 开源 OceanBase 社区版

Go-基本元素

HelloBug

Go 语言

简简单单实现 Python Web 的登录注册页面,还包含一半逻辑。

梦想橡皮擦

8月日更

【架构设计模块五】:设计微博系统中”微博评论“的高性能高可用计算架构

Ryoma

蔚来事故背后,“致命弯道”在辅助驾驶和自动驾驶之间

脑极体

java springboot微信小程序授权登录开发

清风

小程序 java小程序

Flink 和流式应用运维(十-上)

Databri_AI

flink API REST API

微服务架构师-docker私有镜像仓库的配置和使用

学神来啦

Linux 容器 微服务 运维 架构师

DAPP去中心化交易所开发|DAPP与APP的区别

Geek_23f0c3

交易所开发 去中心化交易所系统开发 DAPP智能合约交易系统开发

从0开始的TypeScriptの十:泛型

空城机

typescript 大前端 8月日更

JavaScript单元测试的“抹茶”组合:Mocha和Chai

devpoint

JavaScript 单元测试 8月日更

2021 OceanBase 数据库大赛来袭!邀你改编世界,码出未来

OceanBase 数据库

数据库 oceanbase OceanBase 开源 OceanBase 社区版 OceanBase 数据库大赛

架构实战营模块五作业

maybe

ipfs挖矿是怎样赚钱的?ipfs挖矿值得投资吗?

IPFS挖矿值得投资吗 IPFS挖矿是怎样赚钱的

上游思维:在系统的关键处找一个支点

石云升

读书笔记 8月日更 上游思维

🏆【分布式技术专题】【分布式技术专题】RocketMQ延迟消息实现原理和源码分析

洛神灬殇

RocketMQ 延时队列 8月日更 DelayedQueue

云原生时代到来了么?

escray

学习 极客时间 如何落地业务建模 8月日更

Go- 变量

HelloBug

变量 Go 语言

Magician has released a new version

Magician网络编程包

Java Web 网络编程 io nio

敏捷开发

LeifChen

Scrum 敏捷开发 迭代 8月日更

「最好」的敌人是「好」

非著名程序员

提升认知 认知提升 个人提升 8月日更

ipfs矿机公司实力排行如何?ipfs矿机排名如何?

ipfs矿机公司实力排行如何 ipfs矿机排名如何

GIT远程仓库

一个大红包

8月日更

耗时24小时整理了网络安全学习路线,非常详细!

网络安全学海

黑客 网络安全 信息安全 渗透测试 漏洞挖掘

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章