写点什么

利用 Amazon ElastiCache 寻找附近的 X

  • 2019-11-13
  • 本文字数:5023 字

    阅读完需:约 16 分钟

利用Amazon ElastiCache寻找附近的X
基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。


### 架构
对于开发者来说,Redis因为其性能上的优势往往会被采用作为位置数据的缓存,只是在3.2版本之前需要代码中把位置数据进行Geohash后才能有效的排序和分析。不过3.2版本后,Redis已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache是AWS提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的Redis引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于Elasticache并结合AWS其他服务构建出以下的示例架构:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1-1024x717.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1.png)
1)终端设备获取GPS位置信息,定时或基于事件将数据上传到云端。在AWS上可以选择使用IoT或Kinesis等托管型服务作为数据收集的接收端,也可以使用部署在EC2/Lambda上的自定义服务。
2)所有位置信息写入可以自动扩展的DynamoDB,基本Schema包含设备Id/Timestamp/Geo location, 方便历史查询或轨迹查询。
3)打开DynamoDB流,用KCL或Lambda监听DynamoDB的数据改变,并将当前变化的位置数据更新到Elasticache中建立基于Geospatial的索引缓存。
4)手机应用搜索附近资源时,部署在EC2/Lambda的查询服务利用Elasticache geospatial直接获取结果。
### 实现
如前文所述,步骤1和2可选择的方案很多,比如采用AWS IoT服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的DynamoDB表中。因此,本文将着重介绍如何实现前文架构中的3和4步:
a) 打开DynamoDB 流,获取流的ARN用于读取,如下图:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2-1024x662.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2.png)
读取DynamoDB流数据有三种方式:利用Kinesis adapter,利用低级别API以及利用Lambda函数来进行读取。从易用性的角度来说,当然是Lambda函数最简单,不需要考虑shard,吞吐和checkpoint等问题而专注于业务逻辑。但是Lambda函数并不是在所有的AWS区域都支持,因此本文采用第一种方式利用Kinesis adapter完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
b) 在读取流的同时,我们需要将最新的地理位置信息利用GEOADD更新到Elasticache中。前文提到Redis在3.2版本后,Geospatial Indexing已经被原生支持,而它实际上是Sorted List数据结构的一种扩展,即排序 key扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的API,例如GEOADD——添加地理位置 。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3-1024x204.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3.png)
通过Elasticache可以快速构建出一个Redis环境,包括支持shard的集群模式,如下图所示。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)
构建完成后,通过Elasticache提供的终端节点就可以访问cache了。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)
需要注意的是如果选择的Redis是集群模式,那么就得同步升级支持Redis集群模式的客户端SDK用以开发。因为Redis的集群提供的是分片功能,它会把不同的slots分布在不同的节点上,需要由客户端通过CRC16(Key)取模从而计算出数据在哪个节点上。目前可以支持redis集群模式的客户端有很多,比如本文用到的java的jedis以及nodejs的ioredis。
综合a,b两步的示例代码的StreamCacheProcessor.java如下(其余代码参考http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):
Java
复制代码


   import java.nio.charset.Charset;   import java.util.HashMap;   import java.util.HashSet;   import java.util.List;   import java.util.Map;   import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.GeoCoordinate import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record;
public class StreamsCacheProcessor implements IRecordProcessor {
private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);
private Integer checkpointCounter;
private String clusterHost; private int port; private JedisCluster jedis;
public StreamsCacheProcessor(String clusterHost,int port) { this.clusterHost=clusterHost; this.port=port; }
@Override public void initialize(String shardId) { Set jedisClusterNode=new HashSet(); jedisClusterNode.add(new HostAndPort(clusterHost,port)); jedis=new JedisCluster(jedisClusterNode); checkp ointCounter = 0; } @Override public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); LOG.debug("Received the data as:"+data); if(record instanceof RecordAdapter) com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); //新增GPS数据更新到Elasticache中 if(streamRecord.getEventName().equals("INSERT")){ Map coordinateMap = new HashMap(); double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN()); double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN()); String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS(); coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));
jedis.geoadd("bikes", coordinateMap); LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0){ //checkpoint大小需根据实际需求调整 try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
}
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }
复制代码


c)在完成地理信息的实时更新后, 可以基于Elasticache的数据利用GEORADIUS搜索周边的资源。使用nodejs示例代码如下:
Java
复制代码


   var express = require('express');   var app = express();   var Redis = require('ioredis');
var cluster = new Redis.Cluster([{ port:6379, host:'lab-cluster.4bh9j8.clustercfg.cnn1.cache.amazonaws.com.cn' }]);
app.get('/bikes', function(req,res){ if(req.query['longitude'] && req.query['latitude']){ console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']); cluster.send_command('GEORADIUS', [ 'bikes',req.query['longitude'],req.query['latitude'],2000, 'm', 'WITHDIST', 'WITHCOORD', 'COUNT', 10], (error, reply) =>{
if (error) { res.status(500).send("无法获取附近车辆信息"); return; }
var stations = reply.map( (r) =>{ return { name: r[0], distance: `${r[1]} m`, coordinates: { latitude: Number(r[2][1]), longitude: Number(r[2][0]) } } });
res.status(200).json(stations); }); } });
var server = app.listen(8080,function(){ var host = server.address().address var port = server.address().port console.log("应用实例,访问地址为 http://%s:%s", host, port) });
复制代码


基于以上代码,服务端就可以返回最近的10个资源以及每个资源离当前位置的距离。例如:  请求  `http://hostname or elb address/bikes?longitude=116&latitude=39.4`  返回
Java
复制代码


   [    {     "name": "48093ba0-f8f1-49f0-b312-285800341b08",     "distance": "1117.8519 m",      "coordinates": {       "latitude": 39.40640623614937,       "longitude": 116.01002186536789    }   },   {     "name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",     "distance":"1305.5083 m",     "coordinates": {      "latitude": 39.40184880750488,      "longitude": 116.01500004529953     }    },   ……   ]
复制代码


d)通过封装http请求构建手机应用。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map-503x1024.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map.png)
### 总结
Redis Geospatial功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache提供的托管Redis服务大大简化了对于Redis集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的Amazon DynamoDB则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。


**作者介绍**
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)
赵霏,AWS解决方案架构师。负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内的应用和推广。他拥有超过13年IT行业从业经验,长期专注于企业IT云转型、物联网、移动互联网、Devops等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。
复制代码


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/using-amazon-elasticache-to-find-x/


2019-11-13 08:001008

评论

发布
暂无评论
发现更多内容

假如让你来设计SSL/TLS协议,你要怎么设计呢?

华为云开发者联盟

网络安全 HTTP 通信 SSL/TLS 协议 网络通信安全

Tech Talk 活动预告 | 为什么说 Serverless 是应用开发的未来?

亚马逊云科技 (Amazon Web Services)

Serverless

springsecurity默认用户生成

急需上岸的小谢

「架构实战营」模块四作业 考试试卷存储方案

hxb

「架构实战营」

《大饼卷一切》爆笑相声剧 今晚开票!

InfoQ 天津

appsmith 怎么用?评价如何

蒋川

appsmith

Android TabLayout 选中 tab 文字加粗显示

逆锋起笔

android 3月月更 TabLayout android滑动标签

Dubbo服务如何优雅的校验参数

vivo互联网技术

dubbo 服务器 java;

如何避免在面试中看走眼

Hockor

个人成长 面试经验

业内首家!百度智能云智慧金融业务通过ISO37301合规管理体系认证

百度大脑

免费硬件、专属导师、豪华大礼|AI达人创造营第二期项目征集启动啦!

百度大脑

12 款最棒 Vue 开源 UI 库测评 - 特别针对国内使用场景推荐

蒋川

Vue vue admin

如何在 Vue 中使用 Chart.js - 手把手教你搭可视化数据图表

蒋川

Vue PDF pdf阅读器

微服务工程中,基础组件应用

架构 分布式 微服务

大画 Spark :: 网络(5)-Spark中的server端和client端

dclar

大数据 hadoop spark Spark 源码 大数据开发

Hoo虎符研究院|区块链简报20220307期

区块链前沿News

Hoo 虎符交易所 虎符研究院

社区人物志|缪翎:见证开源世界的女性力量

ApacheDoris

大数据 开源 数据分析 OLAP apache doris

关于中国芯片,这些话如鲠在喉

脑极体

AI人脸识别测温一体机设计

DS小龙哥

3月月更

跨越DDD从理论到工程落地的鸿沟

华为云开发者联盟

DDD 业务逻辑 领域模型 设计思想 业务治理

小程序的第六年,我们还能怎么玩?

知晓云

小程序 微信 小程序生态 小程序运营

Flutter 容器盒子布局模型

岛上码农

flutter ios 安卓 移动端开发 3月月更

selenium操作元素遇到的异常

红毛丹

selenium

安全代码审计-PHP

网络安全学海

网络安全 信息安全 渗透测试 漏洞 代码审计

惨,给Go提的代码被批麻了

捉虫大师

Go 开源 Code Review

java高级用法之:无所不能的java,本地方法调用实况

程序那些事

Java Netty 程序那些事 3月月更

python 编辑器提示 do not use bare except

AlwaysBeta

Python vscode 编辑器 pycharm Python PEP

CompusAss校园社团小程序解决方案

CC同学

高精度轻量级目标检测产业应用,实现多类通信塔识别

百度大脑

React Draggable 实现拖拽 - 最详细中文教程 - 卡拉云

蒋川

React

利用Amazon ElastiCache寻找附近的X_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章