HarmonyOS开发者限时福利来啦!最高10w+现金激励等你拿~ 了解详情
写点什么

利用 Amazon ElastiCache 寻找附近的 X

  • 2019-11-13
  • 本文字数:5023 字

    阅读完需:约 16 分钟

利用Amazon ElastiCache寻找附近的X
基于地理信息的应用已经越来越深入到日常生活中,人们经常会在应用中寻找附近的朋友,车,餐厅或其它资源。而与此同时,随着物理网技术及设备的普及,应用需要更加实时和精确的处理来自各种数据源(包括用户手机,各种传感器设备及其他系统)的大量数据,以完成相关的搜索和计算距离等操作。


### 架构
对于开发者来说,Redis因为其性能上的优势往往会被采用作为位置数据的缓存,只是在3.2版本之前需要代码中把位置数据进行Geohash后才能有效的排序和分析。不过3.2版本后,Redis已经能够原生支持基于位置信息的存储,计算及搜索了。Amazon ElastiCache是AWS提供的托管型的数据缓存服务,借助该服务,用户能够在云中轻松部署、运行和扩展分布式内存数据存储或缓存。 Amazon ElastiCache 的Redis引擎是一项与 Redis 兼容的内存服务,兼具 Redis 的易用性和强大功能,同时还可为要求最苛刻的应用程序提供适用的可用性、可靠性和性能,提供单节点和多达 15 个分片的群集,从而可将内存数据扩展到高达 3.55TiB。这里,我们可以基于Elasticache并结合AWS其他服务构建出以下的示例架构:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1-1024x717.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-1.png)
1)终端设备获取GPS位置信息,定时或基于事件将数据上传到云端。在AWS上可以选择使用IoT或Kinesis等托管型服务作为数据收集的接收端,也可以使用部署在EC2/Lambda上的自定义服务。
2)所有位置信息写入可以自动扩展的DynamoDB,基本Schema包含设备Id/Timestamp/Geo location, 方便历史查询或轨迹查询。
3)打开DynamoDB流,用KCL或Lambda监听DynamoDB的数据改变,并将当前变化的位置数据更新到Elasticache中建立基于Geospatial的索引缓存。
4)手机应用搜索附近资源时,部署在EC2/Lambda的查询服务利用Elasticache geospatial直接获取结果。
### 实现
如前文所述,步骤1和2可选择的方案很多,比如采用AWS IoT服务甚至可以无需任何代码仅通过配置即可完成云端的功能讲数据实时写入相应的DynamoDB表中。因此,本文将着重介绍如何实现前文架构中的3和4步:
a) 打开DynamoDB 流,获取流的ARN用于读取,如下图:
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2-1024x662.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-2.png)
读取DynamoDB流数据有三种方式:利用Kinesis adapter,利用低级别API以及利用Lambda函数来进行读取。从易用性的角度来说,当然是Lambda函数最简单,不需要考虑shard,吞吐和checkpoint等问题而专注于业务逻辑。但是Lambda函数并不是在所有的AWS区域都支持,因此本文采用第一种方式利用Kinesis adapter完成读取。具体参考文档:http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.html
b) 在读取流的同时,我们需要将最新的地理位置信息利用GEOADD更新到Elasticache中。前文提到Redis在3.2版本后,Geospatial Indexing已经被原生支持,而它实际上是Sorted List数据结构的一种扩展,即排序 key扩展成了经纬度,如下图所示的数据结构,并且可以方便的使用基于地理信息的API,例如GEOADD——添加地理位置 。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3-1024x204.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-3.png)
通过Elasticache可以快速构建出一个Redis环境,包括支持shard的集群模式,如下图所示。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-4.png)
构建完成后,通过Elasticache提供的终端节点就可以访问cache了。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/20/20170721-5.png)
需要注意的是如果选择的Redis是集群模式,那么就得同步升级支持Redis集群模式的客户端SDK用以开发。因为Redis的集群提供的是分片功能,它会把不同的slots分布在不同的节点上,需要由客户端通过CRC16(Key)取模从而计算出数据在哪个节点上。目前可以支持redis集群模式的客户端有很多,比如本文用到的java的jedis以及nodejs的ioredis。
综合a,b两步的示例代码的StreamCacheProcessor.java如下(其余代码参考http://docs.amazonaws.cn/amazondynamodb/latest/developerguide/Streams.KCLAdapter.Walkthrough.CompleteProgram.html ):
Java
复制代码


   import java.nio.charset.Charset;   import java.util.HashMap;   import java.util.HashSet;   import java.util.List;   import java.util.Map;   import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory;
import redis.clients.jedis.GeoCoordinate import redis.clients.jedis.HostAndPort; import redis.clients.jedis.JedisCluster;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.RecordAdapter; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.model.Record;
public class StreamsCacheProcessor implements IRecordProcessor {
private static Log LOG = LogFactory.getLog(StreamsCacheProcessor.class);
private Integer checkpointCounter;
private String clusterHost; private int port; private JedisCluster jedis;
public StreamsCacheProcessor(String clusterHost,int port) { this.clusterHost=clusterHost; this.port=port; }
@Override public void initialize(String shardId) { Set jedisClusterNode=new HashSet(); jedisClusterNode.add(new HostAndPort(clusterHost,port)); jedis=new JedisCluster(jedisClusterNode); checkp ointCounter = 0; } @Override public void processRecords(List records, IRecordProcessorCheckpointer checkpointer) { for (Record record : records) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); LOG.debug("Received the data as:"+data); if(record instanceof RecordAdapter) com.amazonaws.services.dynamodbv2.model.Record streamRecord = ((RecordAdapter) record).getInternalObject(); //新增GPS数据更新到Elasticache中 if(streamRecord.getEventName().equals("INSERT")){ Map coordinateMap = new HashMap(); double longitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("longitude").getN()); double latitude = Double.parseDouble(streamRecord.getDynamodb().getNewImage().get("latitude").getN()); String deviceId = streamRecord.getDynamodb().getNewImage().get("deviceId").getS(); coordinateMap.put(deviceId, new GeoCoordinate(longitude, latitude));
jedis.geoadd("bikes", coordinateMap); LOG.info("Updated "+deviceId+" GPS information as:"+longitude+","+latitude); } } checkpointCounter += 1; if(checkpointCounter % 10 == 0){ //checkpoint大小需根据实际需求调整 try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } }
}
@Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if(reason == ShutdownReason.TERMINATE) { try { checkpointer.checkpoint(); } catch (Exception e) { e.printStackTrace(); } } } }
复制代码


c)在完成地理信息的实时更新后, 可以基于Elasticache的数据利用GEORADIUS搜索周边的资源。使用nodejs示例代码如下:
Java
复制代码


   var express = require('express');   var app = express();   var Redis = require('ioredis');
var cluster = new Redis.Cluster([{ port:6379, host:'lab-cluster.4bh9j8.clustercfg.cnn1.cache.amazonaws.com.cn' }]);
app.get('/bikes', function(req,res){ if(req.query['longitude'] && req.query['latitude']){ console.log ('longitude = %s,latitude = %s',req.query['longitude'],req.query['latitude']); cluster.send_command('GEORADIUS', [ 'bikes',req.query['longitude'],req.query['latitude'],2000, 'm', 'WITHDIST', 'WITHCOORD', 'COUNT', 10], (error, reply) =>{
if (error) { res.status(500).send("无法获取附近车辆信息"); return; }
var stations = reply.map( (r) =>{ return { name: r[0], distance: `${r[1]} m`, coordinates: { latitude: Number(r[2][1]), longitude: Number(r[2][0]) } } });
res.status(200).json(stations); }); } });
var server = app.listen(8080,function(){ var host = server.address().address var port = server.address().port console.log("应用实例,访问地址为 http://%s:%s", host, port) });
复制代码


基于以上代码,服务端就可以返回最近的10个资源以及每个资源离当前位置的距离。例如:  请求  `http://hostname or elb address/bikes?longitude=116&latitude=39.4`  返回
Java
复制代码


   [    {     "name": "48093ba0-f8f1-49f0-b312-285800341b08",     "distance": "1117.8519 m",      "coordinates": {       "latitude": 39.40640623614937,       "longitude": 116.01002186536789    }   },   {     "name": "950fb5df-c0ff-4a95-90ea-2f5f574c5796",     "distance":"1305.5083 m",     "coordinates": {      "latitude": 39.40184880750488,      "longitude": 116.01500004529953     }    },   ……   ]
复制代码


d)通过封装http请求构建手机应用。
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map-503x1024.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/08/01/map.png)
### 总结
Redis Geospatial功能可以让开发者更高效的搜索和计算位置信息的记录。同时,Amazon ElastiCache提供的托管Redis服务大大简化了对于Redis集群的维护工作,包括搭建,备份和迁移等工作。最后,自动扩展的Amazon DynamoDB则负责位置信息数据的持久化和检索,而它的流功能也使得数据能够快速实时的流转起来。


**作者介绍**
[](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)](https://d2908q01vomqb2.awsstatic-china.com/472b07b9fcf2c2451e8781e944bf5f77cd8457c8/2017/07/24/Zhao-Fei-Mini.png)
赵霏,AWS解决方案架构师。负责基于AWS的云计算方案架构咨询和设计,同时致力于AWS云服务在国内的应用和推广。他拥有超过13年IT行业从业经验,长期专注于企业IT云转型、物联网、移动互联网、Devops等领域,在大规模后台架构、分布式计算和自动化运维等方面有着广泛的设计和实践经验。
复制代码


本文转载自 AWS 技术博客。


原文链接:


https://amazonaws-china.com/cn/blogs/china/using-amazon-elasticache-to-find-x/


2019-11-13 08:00867

评论

发布
暂无评论
发现更多内容

6000字,详解数据仓库明星产品背后的技术奥秘

百度开发者中心

数据库 大数据

Camtasia混音教程

淋雨

Camtasia

国产分布式数据库StarDB核心技术大揭秘 一:内核分解之数据分片

京东科技开发者

数据库

Python Qt GUI设计:如何调整组件布局比例?(拓展篇—1)

不脱发的程序猿

Python PyQt GUI设计 上位机 调整组件布局比例

ReactiveNetwork库时如何实现网络状态监听的

Changing Lin

12月日更

2022年,RPA的5大发展趋势

金小K

区块链 AI RPA 机器人流程自动化 人工智能「

和12岁小同志搞创客开发:手撕代码,做一款温湿度检测器

不脱发的程序猿

少儿编程 智能硬件 温度传感器 创客开发 Arduino

和12岁小同志搞创客开发:手撕代码,做一款密室自动门

不脱发的程序猿

少儿编程 传感器 智能硬件 创客开发 Arduino

百度智能云与英特尔携手举办2021 EdgeX中国挑战赛成功落幕

百度大脑

人工智能

以容器的方式运行极狐GitLab Runner

极狐GitLab

Docker runner 极狐GitLab

开源demo| 智慧协同让企业更便利

anyRTC开发者

音视频 智慧协同 开源demo 远程协助 远程勘查

WAVE SUMMIT+2021为开发者准备的“小心思”,你get到了吗

科技热闻

如何避免产品Backlog的这七个常见错误

Scrum中文网

Scrum 敏捷开发 研发管理 需求管理 内容合集

AfterShip APP 项目数据驱动的演进

AfterShip

数据库 数据 数据驱动

架构训练营-模块一作业

伊静西蒙

国产分布式数据库StarDB核心技术大揭秘二:智能运维管控

京东科技开发者

数据库

消费医疗门诊的数字化运营

boshi

随笔杂谈

WePack —— 助力企业渐进式 DevOps 转型

CODING DevOps

统一管理 WePack 制品管理 研发构建产物 安全管控

库存管理系统到底有什么作用?

低代码小观

CRM 企业管理系统 ERP 库存 CRM系统

元宇宙与电信运营商

CECBC

数据大屏rem适配方案

CRMEB

新思科技推动DevSecOps落地,帮助企业走出“安全孤岛”

InfoQ_434670063458

DevSecOps 新思科技 软件安全

MySQL锁的分析实战

卢卡多多

28天写作 MySQL 数据库 锁分析 签约计划第二季 12月日更

五分钟,让你明白MySQL是怎么选择索引《死磕MySQL系列 六》

咔咔

MySQL MySQL高级 索引选择而

Go语言学习查缺补漏ing Day5

恒生LIGHT云社区

golang 编程语言

底层逻辑:变化背后的不变

石云升

读书笔记 28天写作 12月日更

基于云的技术架构设计实践-第4篇

hackstoic

运维 云原生 签约计划第二季 业务运维

Android C++系列:Linux网络(一)网络模型

轻口味

android 28天写作 12月日更

Web3.0时代的社交网络会有哪些新变化?

CECBC

Linux一学就会之Centos8系统进程管理 ps管理进程

学神来啦

Linux 运维 linux一学就会 uptime centos8

你知道敏捷团队的迭代目标达成率该是多少吗?

Scrum中文网

Scrum 敏捷开发 研发管理 内容合集 迭代管理

利用Amazon ElastiCache寻找附近的X_语言 & 开发_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章