产品战略专家梁宁确认出席AICon北京站,分享AI时代下的商业逻辑与产品需求 了解详情
写点什么

使用 Spark Streaming 进行情感分析

  • 2016-05-24
  • 本文字数:2054 字

    阅读完需:约 7 分钟

这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。

Spark Streaming

Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。

程序开发设置部分

程序开发起始部分需要做好准备工作。

复制代码
val config = new SparkConf().setAppName("twitter-stream-sentiment")
val sc = new SparkContext(config)
sc.setLogLevel("WARN")
val ssc = new StreamingContext(sc, Seconds(5))
System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
val stream = TwitterUtils.createStream(ssc, None)

这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。

Twitter 上现在的趋势是什么?

很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。

复制代码
val tags = stream.flatMap { status =>
status.getHashtagEntities.map(_.getText)
}
tags.countByValue()
.foreachRDD { rdd =>
val now = org.joda.time.DateTime.now()
rdd
.sortBy(_._2)
.map(x => (x, now))
.saveAsTextFile(s"~/twitter/$now")
}

首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。

分析 Tweets

现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。

有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
build.sbt文件中增加相对应的依赖。

复制代码
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1"
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"

现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:

复制代码
val tweets = stream.filter {t =>
val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase)
tags.contains("#bigdata") && tags.contains("#food")
}

得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:

def detectSentiment(message: String): SENTIMENT_TYPE然后对 detectSentiment 进行测试以确保其可以工作:

复制代码
it("should detect not understood sentiment") {
detectSentiment("") should equal (NOT_UNDERSTOOD)
}
it("should detect a negative sentiment") {
detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE)
}
it("should detect a neutral sentiment") {
detectSentiment("I'm watching a movie") should equal (NEUTRAL)
}
it("should detect a positive sentiment") {
detectSentiment("It was a nice experience.") should equal (POSITIVE)
}
it("should detect a very positive sentiment") {
detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE)
}

完整列子如下:

复制代码
val data = tweets.map { status =>
val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText)
val tags = status.getHashtagEntities.map(_.getText.toLowerCase)
(status.getText, sentiment.toString, tags)
}

data 中包含相关的情感。

和 SQL 协同进行分析

现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:

复制代码
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
data.foreachRDD { rdd =>
rdd.toDF().registerTempTable("sentiments")
}

将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。

sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:

sqlContext.sql("select * from sentiments").show()## 窗口操作

Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
简单的一个窗口操作:

复制代码
tags
.window(Minutes(1))
. (...)

结论

此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。

2016-05-24 17:454981
用户头像

发布了 43 篇内容, 共 28.5 次阅读, 收获喜欢 7 次。

关注

评论

发布
暂无评论
发现更多内容

即时通讯安全篇(十一):IM聊天系统安全手段之传输内容端到端加密技术

JackJiang

网络安全 网络编程 即时通讯 IM openssl

技术分享 | 跨平台API对接(Java)

霍格沃兹测试开发学社

测试左移之Sonarqube scanner使用

霍格沃兹测试开发学社

测试左移之Sonarqube maven项目分析

霍格沃兹测试开发学社

干货 | 实战演练基于加密接口测试测试用例设计

霍格沃兹测试开发学社

干货 | Pytest 结合 Allure 生成测试报告

霍格沃兹测试开发学社

实战演示 H5 性能分析

霍格沃兹测试开发学社

对话彩生活:“互联网+物业”数智化转型的BI应用实践

观远数据

企业号九月金秋榜

干货 | 通用 api 封装实战,带你深入理解 PO

霍格沃兹测试开发学社

Dubbo 3.1.0 正式发布,数据面原生接入 Service Mesh

阿里巴巴云原生

阿里云 云原生 dubbo

【DBA100人】李建明:一名普通DBA的14年技术之路与成长智慧

OceanBase 数据库

2022年Q2银行APP活跃用户规模盘点:头部银行增长稳定

易观分析

金融 银行 用户规模

从 “搞不清楚” 到 “都明白了” 的费曼

图灵社区

量子力学 物理学家

APICloud 可视化编程 - 拖拉拽实现专业级源码

YonBuilder低代码开发平台

低代码开发 多端开发 可视化开发

长安链源码分析启动(1)

长安链

这些并发容器的坑,你要谨记

华为云开发者联盟

后端 开发

低代码适用于哪些应用开发场景

力软低代码开发平台

Python条件语句怎么用

和牛

Python 8月月更

最新出炉!深度解读《中国DevOps现状调查报告(2022)》

嘉为蓝鲸

DevOps

长安链源码分析启动(2)

长安链

他只是试图运用自己的能力,给这个领域带来改变

图灵社区

通信 科学史

长安链源码分析启动(3)

长安链

测试右移之logstash完整配置实例

霍格沃兹测试开发学社

Nft数字藏品app开发,开发数字藏品系统

开源直播系统源码

数字藏品 数字藏品软件开发 数字藏品开发 数字藏品系统

CI 可观测性使变更管理发挥作用|Foresight

观测云

99 大促来袭,利用 MSE 服务自治体系为业务保驾护航

阿里巴巴云原生

阿里云 微服务 云原生

分布式数据中心网络互联技术实现

C++后台开发

数据库 分布式 后端开发 Linux服务器开发 C++开发

Python实战之用内置模块来构建REST服务、RPC服务

山河已无恙

RPC REST API Python.

详解 OpenDAL |Data Infra 研究社第三期

Databend

线上直播 大数据 开源 databend OpenDAL Datafuse Labs

从 “搞不清楚” 到 “都明白了” 的费曼

图灵教育

量子力学 物理学家

「海格通信」化繁为简!云管升级助力海格通信创新之路提速

嘉为蓝鲸

云管理

使用Spark Streaming进行情感分析_语言 & 开发_侠天_InfoQ精选文章