写点什么

利用 Amazon ElastiCache 寻找附近的 X

  • 2019-11-13
  • 本文字数:5023 字

    阅读完需:约 16 分钟

利用Amazon ElastiCache寻找附近的X
基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。


### 架构
对于开发者来说,Redis因为其性能上的优势往往会被采用作为位置数据的缓存,只是在3.2版本之前需要代码中把位置数据进行Geohash后才能有效的排序和分析。不过3.2版本后,Redis已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache是AWS提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的Redis引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于Elasticache并结合AWS其他服务构建出以下的示例架构:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1-1024x717.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1.png)
1)终端设备获取GPS位置信息,定时或基于事件将数据上传到云端。在AWS上可以选择使用IoT或Kinesis等托管型服务作为数据收集的接收端,也可以使用部署在EC2/Lambda上的自定义服务。
2)所有位置信息写入可以自动扩展的DynamoDB,基本Schema包含设备Id/Timestamp/Geo location, 方便历史查询或轨迹查询。
3)打开DynamoDB流,用KCL或Lambda监听DynamoDB的数据改变,并将当前变化的位置数据更新到Elasticache中建立基于Geospatial的索引缓存。
4)手机应用搜索附近资源时,部署在EC2/Lambda的查询服务利用Elasticache geospatial直接获取结果。
### 实现
如前文所述,步骤1和2可选择的方案很多,比如采用AWS IoT服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的DynamoDB表中。因此,本文将着重介绍如何实现前文架构中的3和4步:
a) 打开DynamoDB 流,获取流的ARN用于读取,如下图:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2-1024x662.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2.png)
读取DynamoDB流数据有三种方式:利用Kinesis adapter,利用低级别API以及利用Lambda函数来进行读取。从易用性的角度来说,当然是Lambda函数最简单,不需要考虑shard,吞吐和checkpoint等问题而专注于业务逻辑。但是Lambda函数并不是在所有的AWS区域都支持,因此本文采用第一种方式利用Kinesis adapter完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
b) 在读取流的同时,我们需要将最新的地理位置信息利用GEOADD更新到Elasticache中。前文提到Redis在3.2版本后,Geospatial Indexing已经被原生支持,而它实际上是Sorted List数据结构的一种扩展,即排序 key扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的API,例如GEOADD——添加地理位置 。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3-1024x204.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3.png)
通过Elasticache可以快速构建出一个Redis环境,包括支持shard的集群模式,如下图所示。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)
构建完成后,通过Elasticache提供的终端节点就可以访问cache了。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)
需要注意的是如果选择的Redis是集群模式,那么就得同步升级支持Redis集群模式的客户端SDK用以开发。因为Redis的集群提供的是分片功能,它会把不同的slots分布在不同的节点上,需要由客户端通过CRC16(Key)取模从而计算出数据在哪个节点上。目前可以支持redis集群模式的客户端有很多,比如本文用到的java的jedis以及nodejs的ioredis。
综合a,b两步的示例代码的StreamCacheProcessor.java如下(其余代码参考http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):
Java
复制代码


   import java.nio.charset.Charset;   import java.util.HashMap;   import java.util.HashSet;   import java.util.List;   import java.util.Map;   import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.GeoCoordinate import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record;
public class StreamsCacheProcessor implements IRecordProcessor {
private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);
private Integer checkpointCounter;
private String clusterHost; private int port; private JedisCluster jedis;
public StreamsCacheProcessor(String clusterHost,int port) { this.clusterHost=clusterHost; this.port=port; }
@Override public void initialize(String shardId) { Set jedisClusterNode=new HashSet(); jedisClusterNode.add(new HostAndPort(clusterHost,port)); jedis=new JedisCluster(jedisClusterNode); checkp ointCounter = 0; } @Override public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); LOG.debug("Received the data as:"+data); if(record instanceof RecordAdapter) com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); //新增GPS数据更新到Elasticache中 if(streamRecord.getEventName().equals("INSERT")){ Map coordinateMap = new HashMap(); double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN()); double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN()); String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS(); coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));
jedis.geoadd("bikes", coordinateMap); LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0){ //checkpoint大小需根据实际需求调整 try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
}
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }
复制代码


c)在完成地理信息的实时更新后, 可以基于Elasticache的数据利用GEORADIUS搜索周边的资源。使用nodejs示例代码如下:
Java
复制代码


   var express = require('express');   var app = express();   var Redis = require('ioredis');
var cluster = new Redis.Cluster([{ port:6379, host:'lab-cluster.4bh9j8.clustercfg.cnn1.cache.amazonaws.com.cn' }]);
app.get('/bikes', function(req,res){ if(req.query['longitude'] && req.query['latitude']){ console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']); cluster.send_command('GEORADIUS', [ 'bikes',req.query['longitude'],req.query['latitude'],2000, 'm', 'WITHDIST', 'WITHCOORD', 'COUNT', 10], (error, reply) =>{
if (error) { res.status(500).send("无法获取附近车辆信息"); return; }
var stations = reply.map( (r) =>{ return { name: r[0], distance: `${r[1]} m`, coordinates: { latitude: Number(r[2][1]), longitude: Number(r[2][0]) } } });
res.status(200).json(stations); }); } });
var server = app.listen(8080,function(){ var host = server.address().address var port = server.address().port console.log("应用实例,访问地址为 http://%s:%s", host, port) });
复制代码


基于以上代码,服务端就可以返回最近的10个资源以及每个资源离当前位置的距离。例如:  请求  `http://hostname or elb address/bikes?longitude=116&latitude=39.4`  返回
Java
复制代码


   [    {     "name": "48093ba0-f8f1-49f0-b312-285800341b08",     "distance": "1117.8519 m",      "coordinates": {       "latitude": 39.40640623614937,       "longitude": 116.01002186536789    }   },   {     "name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",     "distance":"1305.5083 m",     "coordinates": {      "latitude": 39.40184880750488,      "longitude": 116.01500004529953     }    },   ……   ]
复制代码


d)通过封装http请求构建手机应用。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map-503x1024.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map.png)
### 总结
Redis Geospatial功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache提供的托管Redis服务大大简化了对于Redis集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的Amazon DynamoDB则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。


**作者介绍**
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)
赵霏,AWS解决方案架构师。负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内的应用和推广。他拥有超过13年IT行业从业经验,长期专注于企业IT云转型、物联网、移动互联网、Devops等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。
复制代码


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/using-amazon-elasticache-to-find-x/


2019-11-13 08:00949

评论

发布
暂无评论
发现更多内容

Vue单文件组件

我搬去水星了

6 月 优质更文活动

Vue中组件的几个重要点

我搬去水星了

6 月 优质更文活动

Vue自定义指令

我搬去水星了

6 月 优质更文活动

华为云新一代分布式数据库GaussDB,给世界一个更优选择

华为云开发者联盟

数据库 后端 华为云 华为云开发者联盟 企业号 6 月 PK 榜

vue-基本操作-收集表单元素绑定的数据

我搬去水星了

6 月 优质更文活动

「悦数图数据库」亮相中国国际信息通信展览会,推进图技术产业化发展

悦数图数据库

通信 图数据库

Java线程池一、基本概念和原理

echoes

Java 线程池

腾讯企点客服赛道国内TOP1!Gartner报告公布最新市场份额

人称T客

隆重共建开放,共享未来 | 2023开放原子全球开源峰会OpenAtom OpenHarmony分论坛即将启幕

开放原子开源基金会

开源 OpenHarmony 开放原子全球开源峰会

中企出海所面临的几点人力资源挑战

用友BIP

中企出海

Vue组件-非单文本组件

我搬去水星了

6 月 优质更文活动

Postman 和 GraphQL:最佳实践指南

Liam

Java Postman API graphql 接口工具

「悦数图数据库」亮相中国国际信息通信展览会,推进图技术产业化发展

悦数图数据库

通信 图数据库 运营商

共享电动单车生产厂家怎么找合适

共享电单车厂家

共享电动车厂家 共享电单车厂商 共享电动车生产 本铯电动车厂家

生态伙伴 | 中电创新科技集聚示范区携手华秋硬创,加速智能硬件孵化

华秋电子

vue-表单元素进阶收集操作

我搬去水星了

6 月 优质更文活动

NFTScan 与 Realy 达成合作伙伴,双方在元宇宙 NFT 数据方面进行深度合作!

NFT Research

NFT Metaverse

DHR数智人力:智能学习加速人才培养与创新

用友BIP

人力资源 人才 数智人力

突破创新!Windows主机助你打造独一无二的网站!

一只扑棱蛾子

Windows主机

Vue-组件的嵌套

我搬去水星了

6 月 优质更文活动

实例讲解Flink 流处理程序编程模型

华为云开发者联盟

开发 华为云 华为云开发者联盟 企业号 6 月 PK 榜

华为云 UCS GitOps:轻松交付多集群云原生应用

华为云开发者联盟

华为云 华为云开发者联盟 企业号 6 月 PK 榜

大模型扎堆「赶考」,语文还是国产AI行,文言文能力超过95%考生

Openlab_cosmoplat

人工智能 机器学习 AI 高考

微服务高并发概念与核心类:调用链上下文与入口类

互联网架构师小马

构建财务共享体系,智能化引领转型升级

用友BIP

财务共享

旭阳数字:让焦化行业供应链更数智

用友BIP

数智平台

面向多告警源,如何构建统一告警管理体系?

阿里巴巴云原生

阿里云 云原生 可观测

【6.02-6.09】写作社区优秀技术博文一览

InfoQ写作社区官方

热门活动 优质创作周报

Vue基本的内置指令

我搬去水星了

6 月 优质更文活动

Excelize 荣获 2022 年中国开源创新大赛一等奖

xuri

开源 编程 开发者 创新 Excelize

电源管理IC下游市场向高端工业和汽车领域转型,这家芯片设计厂商值得关注

华秋电子

利用Amazon ElastiCache寻找附近的X_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章