这里将使用 Twitter 流式数据,它符合所有所需:持续而且无止境的数据源。
Spark Streaming
Spark Streaming 在电子书《手把手教你学习Spark》第六章有详细介绍,这里略过Streaming API 的详细介绍,直接进行程序开发 。
程序开发设置部分
程序开发起始部分需要做好准备工作。
val config = new SparkConf().setAppName("twitter-stream-sentiment") val sc = new SparkContext(config) sc.setLogLevel("WARN") val ssc = new StreamingContext(sc, Seconds(5)) System.setProperty("twitter4j.oauth.consumerKey", "consumerKey") System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret") System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret") val stream = TwitterUtils.createStream(ssc, None)
这里创建一个 Spark Context sc,设置日志级别为 WARN 来消除 Spark 生成的日志。使用sc创建 Streaming Contextssc,然后设置 Twitter 证书来获得 Twitter 网站数据。
Twitter 上现在的趋势是什么?
很容易的能够找到任意给定时刻的 Twitter 趋势,仅仅需要计算数据流每个标签的数目。让我们看下 Spark 如何实现这个操作的。
val tags = stream.flatMap { status => status.getHashtagEntities.map(_.getText) } tags.countByValue() .foreachRDD { rdd => val now = org.joda.time.DateTime.now() rdd .sortBy(_._2) .map(x => (x, now)) .saveAsTextFile(s"~/twitter/$now") }
首先从 Tweets 获取标记,并计算标记的数量,按数量排序,然后持久化结果。我们基于前面的结果建立一个监控面板来跟踪趋势标签。作者的同事就可以创建一个广告标记(campaigns),并吸引更多的用户。
分析 Tweets
现在我们想增加一个功能来获得用户主要感兴趣的主题集。为了这个目的我们想对 Tweets 的大数据和食物两个不相关的主题进行情感分析。
有几种 API 可以在 Tweets 上做情感分析,但是作者选择斯坦福自然语言处理组开发的库来抽取相关情感。
在build.sbt文件中增加相对应的依赖。
libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" libraryDependencies += "edu.stanford.nlp" % "stanford-corenlp" % "3.5.1" classifier "models"
现在,我们通过 Streaming 过滤一定的哈希标签,只选择感兴趣的 Tweets,如下所示:
val tweets = stream.filter {t => val tags = t.getText.split(" ").filter(_.startsWith("#")).map(_.toLowerCase) tags.contains("#bigdata") && tags.contains("#food") }
得到 Tweets 上所有标签,然后标记出#bigdata 和 #food 两个标签。
接下来定一个函数从 Tweets 抽取相关的情感:
def detectSentiment(message: String): SENTIMENT_TYPE
然后对 detectSentiment 进行测试以确保其可以工作:
it("should detect not understood sentiment") { detectSentiment("") should equal (NOT_UNDERSTOOD) } it("should detect a negative sentiment") { detectSentiment("I am feeling very sad and frustrated.") should equal (NEGATIVE) } it("should detect a neutral sentiment") { detectSentiment("I'm watching a movie") should equal (NEUTRAL) } it("should detect a positive sentiment") { detectSentiment("It was a nice experience.") should equal (POSITIVE) } it("should detect a very positive sentiment") { detectSentiment("It was a very nice experience.") should equal (VERY_POSITIVE) }
完整列子如下:
val data = tweets.map { status => val sentiment = SentimentAnalysisUtils.detectSentiment(status.getText) val tags = status.getHashtagEntities.map(_.getText.toLowerCase) (status.getText, sentiment.toString, tags) }
data 中包含相关的情感。
和 SQL 协同进行分析
现在作者想把情感分析的数据存储在外部数据库,为了后续可以使用 SQL 查询。
具体操作如下:
val sqlContext = new SQLContext(sc) import sqlContext.implicits._ data.foreachRDD { rdd => rdd.toDF().registerTempTable("sentiments") }
将 Dstream 转换成 DataFrame,然后注册成一个临时表,其他喜欢使用 SQL 的同事就可以使用不同的数据源啦。
sentiment 表可以被任意查询,也可以使用 Spark SQL 和其他数据源(比如,Cassandra 数据等)进行交叉查询。
查询 DataFrame 的列子:
sqlContext.sql("select * from sentiments").show()
## 窗口操作
Spark Streaming 的窗口操作可以进行回溯数据,这在其他流式引擎中并没有。
为了使用窗口函数,你需要 checkpoint 流数据,具体详情见 http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing 。
简单的一个窗口操作:
tags .window(Minutes(1)) . (...)
结论
此列子虽然简单,但是其可以使用 Spark 解决实际问题。我们可以计算 Twitter 上主题趋势。
评论