Skip to main content

并发读写外部依赖

在我们以task为粒度的分布式系统中,需要对数据做checkpoint,以保证task在重启的时候内存中未被消费完的数据可以保存到文件系统中以防止丢失。
在第一个版本中,我们的task默认使用了task的名字作为任务的ID,同时在做checkpoint时使用diskQueue也是使用相同的方式来生成。直到我们碰到一个需要更新task配置的需求后产生了问题。
为了解决任务更新的问题,我们task的做了使用时间戳作为版本管理,简单就是说:在更新一个任务的配置后,version会替换,这时候会生成一个新的task(旧的任务在消费完数据后会被cleanTask的任务给删除掉)来继续。
在过渡期的时候,即新旧task同时在运行的时候,我们做了重启操作,此时两个任务同时执行了checkpoint操作。而bug的原因是在于,task的ID和diskQueue的ID是不相同的!也就是说两个任务虽然new出两个diskQ,但是两个diskQ会同时向一个文件中写数据,这就导致了文件的损坏。
这里的反思是:对于一个Task任务,如果需要和外部的文件或者其他资源交互时,一定需要保证外部的依赖对于每一个task任务都是唯一的。这里以fileSystem为例子,一个task保证对应的是一个file or dir。两种方式:1.使用一个xxx.lock的方式,一个系统如果已经决定对该资源做write/read操作时,就建立一个lock。该系统内部的进程想要同时做操作时可以避免因为上述简单的ID BUG而造成的问题。同时其他系统可以辨识到该文件可能被其他应用程序使用中,他可以针对这种情况做一些预期内的操作。
  • Golang中突然想起一种方案,对于需要写文件或者其他资源访问时,使用一个channel来做串行处理。比如当多个不可预知的任务可能同时做一个写入文件操作时,任务可以将此次操作的metadata以一种特定的数据格式交给上层系统(我们定义的channel)来统一处理,因为channel的并发写入是绝对安全的。当然如果是需要对多个文件做写入操作时,我们可以使用这样一种方式: 一个channel对应下游有多个channel(而不是file对象),每个channel都定义一个唯一ID,作为suffix。每个channel写完的文件都有ID,比如A,B,C三个channel现在得到的metadata是:向task.txt中写入数据,如果直接同时写,需要自己控制锁,我们使用task.txt.A task.txt.B task.txt.C的方式来写入,即channel A写入task.txt.A 中,B->B中。 再任务读取时,生成这样的3个channel并发读取到内存的总线channel。

Comments

Popular posts from this blog

学习服务器配置之路~~

第一个常见的小问题:MySQL安装 os : fedora 20 mysql: mysql-server(5.5) 所有假设你的系统是没有经过特殊配置的。 1: yum install mysql-server 2: mysql 报错:socket连接不上 3: service mysqld start   注意这步是 mysqld 不是mysql 这样就解决。网上的方法好像有点麻烦。 第二个小问题:解压一些文件(.tar.gz)时报错 http://itsfoss.com/how-solve-stdin-gzip-format/ 上面介绍的很清楚,总之要先确认你下载的文件类型。 第三个小问题。配置tomcat服务器 主要问题是比如我的域名是 cqupt.me 而你tomcat服务器的项目在/webapps/{your projectname} 这时你很蛋疼的要 cqupt.me:8080/{your projectname}/index.html。 如果要cqupt.me就可以完成。这样配置: 都是在tomcat下/conf/server.xml 第一步端口。简单 不废话 第二部。 <Host name="localhost" appBase="webapps" unpackWARs="true" autoDeploy="true" xmlValidation="false" xmlNamespaceAware="false"> </Host> 在标签中间插入: <Context path=""  docBase="xbwl"  debug="0" reloadable="true"/> docBase="xbwl" xbwl 即为指定的项目。即({your projectname}_ 完整如下: <Host name="localhost" appBase="webapps" ...

Golang http server performance tuning practice (Golang HTTP服务器性能调优实践)

  Golang 1.8后本身自带了pprof的这个神器,可以帮助我们很方便的对服务做一个比较全面的profile。对于一个Golang的程序,可以从多个方面进行profile,比如memory和CPU两个最基本的指标,也是我们最关注的,同时对特有的goroutine也有运行状况profile。关于golang profiling本身就不做过多的说明,可以从 官方博客 中了解到详细的过程。   Profile的环境 Ubuntu 14.04.4 LTS (GNU/Linux 3.19.0-25-generic x86_64) go version go1.9.2 linux/amd64  profile的为一个elassticsearch数据导入接口,承担接受上游数据,根据元数据信息写入相应的es索引中。目前的状况是平均每秒是1.3Million的Doc数量。   在增加了profile后,从CPU层面发现几个问题。 runtime mallocgc 占用了17.96%的CPU。 SVG部分图如下 通过SVG图,可以看到调用链为: ioutil.ReadAll -> buffer.ReadFrom -> makeSlice -> malloc.go  然后进入ReadAll的源码。 readAll()方法 func readAll(r io.Reader, capacity int64) (b []byte, err error) { buf := bytes . NewBuffer ( make ([] byte , 0 , capacity )) // If the buffer overflows, we will get bytes.ErrTooLarge. // Return that as an error. Any other panic remains.   defer func() { e := recover () if e == nil { return } if panicErr , ok := e .( error ) ; ok && p...

Python 使用socket实现ftp 客服端

之前先了解ftp协议,然后解释代码 连接到ftp服务器,得到一个socket(这是一个连接到ftp命令端口socket) 发送必要request 第一步 Connect到服务器后,ftp_socket.recv(1024) 到服务器的欢迎消息(1 中的socket),不要问为什么,ftp协议规定的,应该是。 *注意的是,后面ftp_socket每一次的请求后,都要recv一次,不管你是否全部接受到了都要recv一次,不然可能后面接受不到一些消息。个人觉着这可能是ftp协议的规定:每一次request,都会给client一个response。如果 client没有接受这个response,那么下次的request不会被服务器接受,所以client的recv就会卡住! 第二步就像代码中 直到 #LIST 都是用的命令端口。 而使用数据端口时,就是用命令端口   ftp_socket.send("PASV \r\n")     new_port = ftp_socket.recv(1024)      使用命令 PASV 请求一个动态的数据端口。 解释 我理解的动态数据端口: 即你每一次请求到一个数据端口后,你只能使用一次。比如: [ INFO] 2014-11-29 22:25:44,682 [admin] [127.0.0.1] SENT: 227 Entering Passive Mode (127,0,0,1,204,82)    这是ftp服务器端发送的请求(apache ftpserver),明显括号中是你的ip(我的是本机),然后两个数字。通过查询,你要请求的数据端口:(a,b,c,d,x,y)  new_port  = x*250+y 剩下的部分就很简单了。在代码中再有点解释(后面附server端log) 最后还应该用发送一个quit命令,告诉server 我的请求完毕了。这里忘了。 一点补充:看到server端log可以发现,服务器每一次的SEND  我们都应该recv一次 #download a folder's files  import socket class ParseUrl()...