随着组织产生的数据爆炸性增长,从 GB 到 TB,从 TB 到 PB,传统的数据库已经无法通过垂直扩展来管理如此之大数据。传统方法存储和处理数据的成本将会随着数据量增长而显著增加。这使得很多组织都在寻找一种经济的解决方案,比如 NoSQL 数据库,它提供了所需的数据存储和处理能力、扩展性和成本效率。NoSQL 数据库不使用 SQL 作为查询语言。这种数据库有多种不同的类型,比如文档结构存储、键值结构存储、图结构、对象数据库等等。
由于 NoSQL 数据库具有高速的读写能力,因此它的的典型应用包括归档历史日志、事件日志、电子商务日志、游戏数据、社交数据等,存储下来的数据是为了进行后续处理,以得到关于用户以及他们使用情况的有用信息。
我们在本文中使用的 NoSQL 是 MongoDB ,它是一种开源的文档数据库系统,开发语言为 C++。它提供了一种高效的面向文档的存储结构,同时支持通过 MapReduce 程序来处理所存储的文档;它的扩展性很好,而且支持自动分区。Mapreduce 可以用来实现数据聚合。它的数据以 BSON(二进制 JSON)格式存储,在存储结构上支持动态 schema,并且允许动态查询。和 RDBMS 的 SQL 查询不同, Mongo 查询语言以 JSON 表示。
MongoDB 提供了一个聚合框架,其中包括常用功能,比如 count、distinct 和 group。然而更多的高级聚合函数,比如 sum、average、max、min、variance(方差)和 standard deviation(标准差)等需要通过 MapReduce 来实现。
这篇文章描述了在 MongoDB 存储的文档上使用 MapReduce 来实现通用的聚合函数,如 sum、average、max、min、variance 和 standard deviation;聚合的典型应用包括销售数据的业务报表,比如将各地区的数据分组后计算销售总和、财务报表等。
我们从本文示例应用所需软件的安装开始。
软件安装
首先在本地机器上安装并设置 MongoDB 服务。
- 从 Mongo 网站上下载 MongoDB,解压到本地目录,比如 C:>Mongo
- 在上一个文件夹内创建数据目录。比如:C:\Mongo\Data
- 如果数据文件存放在其他地方,那么在用 mongod.exe 命令启动 MongoDB 时,需要在命令行加参数—-dbpath
- 启动服务
- MongoDB 提供了两种方式:mongod.exe 以后台进程启动;mongo.exe 启动命令行界面,可做管理操作。这两个可执行文件都位于 Mongo\bin 目录下;
- 进入 Mongo 安装目录的 bin 目录下,比如:C:> cd Mongo\bin
- 有两种启动方式,如下:
mongod.exe –dbpath C:\Mongo\data
或者 ``` mongod.exe –config mongodb.config
mongodb.config 是 Mongo\\bin 目录下的配置文件,需要在此配置文件中指定数据目录(比如,dbpath= C:\\Mongo\\Data)的位置。 - 连接到 MongoDB,到这一步,mongo 后台服务已经启动,可以通过 http://localhost:27017 查看。 MongoDB 启动运行后,我们接下来看它的聚合函数。 ## 实现聚合函数 在关系数据库中,我们可以在数值型字段上执行包含预定义聚合函数的 SQL 语句,比如,SUM()、COUNT()、MAX() 和 MIN()。但是在 MongoDB 中,需要通过 MapReduce 功能来实现聚合以及批处理,它跟 SQL 里用来实现聚合的 GROUP BY 从句比较类似。下一节将描述关系数据库中 SQL 方式实现的聚合和相应的通过 MongoDB 提供的 MapReduce 实现的聚合。 为了讨论这个主题,我们考虑如下所示的 Sales 表,它以 MongoDB 中的反范式形式呈现。 **Sales 表 ** \# 列名 数据类型 **1** OrderId INTEGER **2** OrderDate STRING **3** Quantity INTEGER **4** SalesAmt DOUBLE **5** Profit DOUBLE **6** CustomerName STRING **7** City STRING **8** State STRING **9** ZipCode STRING **10** Region STRING **11** ProductId INTEGER **12** ProductCategory STRING **13** ProductSubCategory STRING **14** ProductName STRING **15** ShipDate STRING 我们提供了一个查询的样例集,这些查询使用聚合函数、过滤条件和分组从句,及其等效的 MapReduce 实现,即 MongoDB 实现 SQL 中 GROUP BY 的等效方式。在 MongoDB 存储的文档上执行聚合操作非常有用,这种方式的一个限制是聚合函数(比如,SUM、AVG、MIN、MAX)需要通过 mapper 和 reducer 函数来定制化实现。 MongoDB 没有原生态的用户自定义函数(UDFs)支持。但是它允许使用 db.system.js.save 命令来创建并保存 JavaScript 函数,JavaScript 函数可以在 MapReduce 中复用。下表是一些常用的聚合函数的实现。稍后,我们会讨论这些函数在 MapReduce 任务中的使用。 ** 聚合函数 ** **Javascript 函数 ** `SUM`
db.system.js.save( { _id : “Sum” ,
value : function(key,values)
{
var total = 0;
for(var i = 0; i < values.length; i++)
total += values[i];
return total;
}});
`AVERAGE`
db.system.js.save( { _id : “Avg” ,
value : function(key,values)
{
var total = Sum(key,values);
var mean = total/values.length;
return mean;
}});
`MAX`
db.system.js.save( { _id : “Max” ,
value : function(key,values)
{
var maxValue=values[0];
for(var i=1;i
`MIN`
db.system.js.save( { _id : “Min” ,
value : function(key,values)
{
var minValue=values[0];
for(var i=1;i
`VARIANCE`
db.system.js.save( { _id : “Variance” ,
value : function(key,values)
{
var squared_Diff = 0;
var mean = Avg(key,values);
for(var i = 0; i < values.length; i++)
{
var deviation = values[i] - mean;
squared_Diff += deviation * deviation;
}
var variance = squared_Diff/(values.length);
return variance;
}});
`STD DEVIATION`
db.system.js.save( { _id : “Standard_Deviation”
, value : function(key,values)
{
var variance = Variance(key,values);
return Math.sqrt(variance);
}});
SQL 和 MapReduce 脚本在四种不同的用例场景中实现聚合函数的代码片段如下表所示。 **1. 各地区的平均订单量 ** 下面的查询是用来获取不同地区的平均订单量。 **SQL Query** **MapReduce Functions** `SELECT`
db.sales.runCommand(
{
mapreduce : “sales” ,
`City,` `State,` `Region,`
map:function()
{ // emit function handles the group by
emit( {
// Key
city:this.City,
state:this.State,
region:this.Region},
// Values
this.Quantity);
},
`AVG(Quantity)`
reduce:function(key,values)
{
var result = Avg(key, values);
return result;
}
`FROM sales` `GROUP BY City, State, Region`
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } });
**2. 产品的分类销售总额 ** 下面的查询是用来获取产品的分类销售额,根据产品类别的层级分组。在下面例子中,不同的产品类别作为个体维度,它们也可以被称为更复杂的基于层次的维度。 **SQL 查询 ** **MapReduce 函数 ** `SELECT`
db.sales.runCommand(
{
mapreduce : “sales” ,
`ProductCategory, ProductSubCategory, ProductName,`
map:function()
{
emit(
// Key
{key0:this.ProductCategory,
key1:this.ProductSubCategory,
key2:this.ProductName},
// Values
this.SalesAmt);
},
`SUM(SalesAmt)`
reduce:function(key,values)
{
var result = Sum(key, values);
return result;
}
`FROM sales` `GROUP BY ProductCategory, ProductSubCategory, ProductName`
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } });
**3. 一种产品的最大利润 ** 下面的查询是用来获取一个给定产品基于过滤条件的最大利润。 **SQL 查询 ** **MapReduce 函数 ** `SELECT`
db.sales.runCommand(
{
mapreduce : “sales” ,
`ProductId, ProductName,`
map:function()
{
if(this.ProductId==1)
emit( {
key0:this.ProductId,
key1:this.ProductName},
this.Profit);
},
`MAX(SalesAmt)`
reduce:function(key,values)
{
var maxValue=Max(key,values);
return maxValue;
}
`FROM sales` `WHERE ProductId=’1’`
// WHERE condition implementation is provided in
map() function
`GROUP BY ProductId, ProductName`
// Group By is handled by the emit(keys, values)
line in the map() function above
out : { inline : 1 } });
**4. 总量、总销售额、平均利润 ** 这个场景的需求是计算订单的总数、总销售额和平均利润,订单 ID 在 1 到 10 之间,发货时间在 2011 年的 1 月 1 日到 12 月 31 日之间。下面的查询是用来执行多个聚合,比如,在指定年份以及指定的不同区域和产品类别范围里订单的总数、总销售额和平均利润。 **SQL 查询 ** **MapReduce 函数 ** `SELECT`
db.sales.runCommand(
{ mapreduce : “sales” ,
`Region,` `ProductCategory,` `ProductId,`
map:function()
{
emit( {
// Keys
region:this.Region,
productCategory:this.ProductCategory,
productid:this.ProductId},
// Values
{quantSum:this.Quantity,
salesSum:this.SalesAmt,
avgProfit:this.Profit} );
}
`Sum(Quantity),` `Sum(Sales),` `Avg(Profit)`
reduce:function(key,values)
{
var result=
{quantSum:0,salesSum:0,avgProfit:0};
var count = 0;
values.forEach(function(value)
{
// Calculation of Sum(Quantity)
result.quantSum += values[i].quantSum;
// Calculation of Sum(Sales)
result.salesSum += values[i].salesSum;
result.avgProfit += values[i].avgProfit;
count++;
}
// Calculation of Avg(Profit)
result.avgProfit = result.avgProfit / count;
return result;
},
`FROM Sales` `WHERE` `Orderid between 1 and 10 AND` `Shipdate BETWEEN ‘01/01/2011’ and` `‘12/31/2011’`
query : {
“OrderId” : { “$gt” : 1 },
“OrderId” : { “$lt” : 10 },
“ShipDate” : { “$gt” : “01/01/2011” },
“ShipDate” : { “$lt” : “31/12/2011” },
},
`GROUP BY` `Region, ProductCategory, ProductId`
// Group By is handled by the emit(keys, values)
line in the map() function above
`LIMIT 3;` `<b>limit</b> : 3,` ``` out : { inline : 1 } });
既然我们已经看了在不同业务场景下的聚合函数的代码示例,接下来我们准备来测试这些函数。
测试聚合函数
MongoDB 的 MapReduce 功能通过数据库命令来调用。Map 和 Reduce 函数在前面章节里已经使用 JavaScript 实现。下面是执行 MapReduce 函数的语法。
db.runCommand( { mapreduce : <collection>, map : <mapfunction>, reduce : <reducefunction> [, query : <query filter object>] [, sort : <sorts the input objects using this key. Useful for optimization, like sorting by the emit key for fewer reduces>] [, limit : <number of objects to return from collection>] [, out : <see output options below>] [, keeptemp: <true|false>] [, finalize : <finalizefunction>] [, scope : <object where fields go into javascript global scope >] [, jsMode : true] [, verbose : true] } ) Where the Output Options include: { replace : "collectionName" } { merge : "collectionName" { reduce : "collectionName" } { inline : 1}
下面是用来保存聚合函数并在 MapReduce 中使用的命令。
启动 Mongo 命令行并设置表
- 确保 Mongo 后台进程在运行,然后执行 mongo.exe 启动 Mongo 命令行。
- 使用命令切换数据库:use mydb
- 使用命令查看 Sales 表的内容:db.sales.find()
find 命令的输出如下:
{ "_id" : ObjectId("4f7be0d3e37b457077c4b13e"), "_class" : "com.infosys.mongo.Sales", "orderId" : 1, "orderDate" : "26/03/2011", "quantity" : 20, "salesAmt" : 200, "profit" : 150, "customerName" : "CUST1", "productCategory" : "IT", "productSubCategory" : "software", "productName" : "Grad", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b13f"), "_class" : "com.infosys.mongo.Sales", "orderId" : 2, "orderDate" : "23/05/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 40, "customerName" : "CUST2", "productCategory" : "IT", "productSubCategory" : "hardware", "productName" : "HIM", "productId" : 1 } { "_id" : ObjectId("4f7be0d3e37b457077c4b140"), "_class" : "com.infosys.mongo.Sales", "orderId" : 3, "orderDate" : "22/09/2011", "quantity" : 40, "salesAmt" : 200, "profit" : 80, "customerName" : "CUST1", "productCategory" : "BT", "productSubCategory" : "services", "productName" : "VOCI", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b141"), "_class" : "com.infosys.mongo.Sales", "orderId" : 4, "orderDate" : "21/10/2011", "quantity" : 30, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 2 } { "_id" : ObjectId("4f7be0d3e37b457077c4b142"), "_class" : "com.infosys.mongo.Sales", "orderId" : 5, "orderDate" : "21/06/2011", "quantity" : 50, "salesAmt" : 200, "profit" : 20, "customerName" : "CUST3", "productCategory" : "BT", "productSubCategory" : "hardware", "productName" : "CRUD", "productId" : 1 }<br></br>
创建并保存聚合函数
- 通过 MongoDB 命令行窗口执行如下命令:
> db.system.js.save( { _id : "Sum" , value : function(key,values) { var total = 0; for(var i = 0; i < values.length; i++) total += values[i]; return total; }});
- 在示例表 Sales 表上执行 MapReduce 程序
> db.sales.runCommand( { mapreduce : "sales" , map:function() { emit( {key0:this.ProductCategory, key1:this.ProductSubCategory, key2:this.ProductName}, this.SalesAmt); }, reduce:function(key,values) { var result = Sum(key, values); return result; } out : { inline : 1 } });<br></br>
输出如下:
"results" : [ { "_id" : { "key0" : "BT", "key1" : "hardware", "key2" : "CRUD" }, "value" : 400 }, { "_id" : { "key0" : "BT", "key1" : "services", "key2" : "VOCI" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "hardware", "key2" : "HIM" }, "value" : 200 }, { "_id" : { "key0" : "IT", "key1" : "software", "key2" : "Grad" }, "value" : 200 } ], "timeMillis" : 1, "timing" : { "mapTime" : NumberLong(1), "emitLoop" : 1, "total" : 1 }, "counts" : { "input" : 5, "emit" : 5, "output" : 4 }, <span>"ok" : 1</span><br></br>
总结
MongoDB 提供了面向文档的存储结构,可以很容易扩展支持 TB 级数据。同时也提供了 Map Reduce 功能,可以通过批处理方式使用类 SQL 函数来实现数据聚合。在这篇文章中,我们描述了安装 MongoDB 并使用 MapReduce 特性执行聚合函数的过程,也提供了简单 SQL 聚合的 MapReduce 示例实现。在 MongoDB 中,更复杂的聚合函数也可以通过使用 MapReduce 功能实现。
关于作者
Arun Viswanathan Infosys 公司 Cloud Center of Excellence (CoE) 的技术架构师,该公司在 IT 和商业咨询服务上位于全球领先的地位。Arun 在 Java、JavaEE、云计算以及大数据应用架构的定义和实现方面有 9 年半的工作经验。他现在从事大数据解决方案的设计、开发和咨询。Email: Arun_Viswanathan01@infosys.com .
Shruthi Kumar Infosys 公司 Cloud Center of Excellence (CoE) 的技术分析师,该公司在 IT 和商业咨询服务上位于全球领先的地位。Shruthi 在 Java、网格计算、云计算以及大数据应用架构上有 5 年的工作经验。她现在从事大数据解决方案的开发和咨询。Email: Shruthi_Kumar01@infosys.com .
原文链接: http://www.infoq.com/articles/implementing-aggregation-functions-in-mongodb
评论