写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:455438
用户头像

发布了 43 篇内容, 共 30.7 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

青藤:省心又省钱!安全运营服务正在成为甲方企业的主流选择

青藤云安全

恒源云(GPUSHARE)_字节跳动的mRASP预训练模型真香

恒源云

字节跳动 机器翻译 语音识别

中国电信发布运营商行业首个云原生关系型数据库TeleDB for openGauss

龙蜥实验室来了!收下这份指南,秒级体验 Anolis OS

OpenAnolis小助手

国产操作系统 龙蜥社区

应急响应-Yara规则木马检测

H

网络安全 应急响应

openGauss Summit 2021你想知道的都在这!

openGauss 2021 感谢有您,一起创造了那些灿烂记忆

小声嘟囔:char 和 unsigned char 有那么大差距吗?

BUG侦探

c ios 汇编 ios开发

Linux之find命令的参数详解

CRMEB

从GitHub 到极狐GitLab 的迁移指南

极狐GitLab

GitHub 极狐GitLab 迁移指南

多IOT设备上跑物联网应用,你也可以

Speedoooo

物联网 IoT ios开发 Andriod开发

分享一个小故事

石云升

故事 1月月更

双碳绿色风中,乘势而起了哪些新能源?

脑极体

一篇从购买服务器到部署博客代码的详细教程

冴羽

nginx 前端 后端 博客 博客搭建

技术说|拓维·建木边缘计算平台,让算力先行一步

拓维信息

云计算 大数据 边缘计算

05 Prometheus之监控主机和容器

穿过生命散发芬芳

Prometheus 1月月更

新思科技:2022年软件安全行业七大趋势预测

InfoQ_434670063458

新思科技 2022 安全趋势

教你实现一个 iOS 重签名工具

37手游iOS技术运营团队

ios xcode

共话数据库技术与行业数字化融合创新,探讨开源数据库未来发展

深入理解虚拟化

极客重生

云计算 容器 虚拟机 调度 资源隔离

一周信创舆情观察(2021.12.27~2022.1.3)

统小信uos

AI开发平台系列1:AI开发平台“家族”概览

Baihai IDP

人工智能 ide AI 平台

LabVIEW图像模式匹配(基础篇—11)

不脱发的程序猿

机器视觉 图像处理 LabVIEW 图像模式匹配

开源demo| anyRTC 互动白板发布,助力实时互动场景

anyRTC开发者

音视频 在线教育 视频会议 智慧协同 开源demo

数字人民币app公开上架应用市场 试点区域外用户暂无法使用

CECBC

云计算厂商们,你们辜负了中国的用户

观测观测

云原生 云计算架构师

技术干货 | WebRTC 技术解析之 Android VDM

网易云信

Java android 音视频 VDM

廖湘科:数据库需要充分利用开源和发展开源,广泛吸纳全产业力量

openGauss数据库源码解析系列文章——存储引擎源码解析(一)

加密货币、去中心化金融和交易的演变:一种交易成本方法

CECBC

kubelet 的主动驱逐POD

Geek_f24c45

Kubernetes kubelet

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章