Skip to main content

并发读写外部依赖

在我们以task为粒度的分布式系统中,需要对数据做checkpoint,以保证task在重启的时候内存中未被消费完的数据可以保存到文件系统中以防止丢失。
在第一个版本中,我们的task默认使用了task的名字作为任务的ID,同时在做checkpoint时使用diskQueue也是使用相同的方式来生成。直到我们碰到一个需要更新task配置的需求后产生了问题。
为了解决任务更新的问题,我们task的做了使用时间戳作为版本管理,简单就是说:在更新一个任务的配置后,version会替换,这时候会生成一个新的task(旧的任务在消费完数据后会被cleanTask的任务给删除掉)来继续。
在过渡期的时候,即新旧task同时在运行的时候,我们做了重启操作,此时两个任务同时执行了checkpoint操作。而bug的原因是在于,task的ID和diskQueue的ID是不相同的!也就是说两个任务虽然new出两个diskQ,但是两个diskQ会同时向一个文件中写数据,这就导致了文件的损坏。
这里的反思是:对于一个Task任务,如果需要和外部的文件或者其他资源交互时,一定需要保证外部的依赖对于每一个task任务都是唯一的。这里以fileSystem为例子,一个task保证对应的是一个file or dir。两种方式:1.使用一个xxx.lock的方式,一个系统如果已经决定对该资源做write/read操作时,就建立一个lock。该系统内部的进程想要同时做操作时可以避免因为上述简单的ID BUG而造成的问题。同时其他系统可以辨识到该文件可能被其他应用程序使用中,他可以针对这种情况做一些预期内的操作。
  • Golang中突然想起一种方案,对于需要写文件或者其他资源访问时,使用一个channel来做串行处理。比如当多个不可预知的任务可能同时做一个写入文件操作时,任务可以将此次操作的metadata以一种特定的数据格式交给上层系统(我们定义的channel)来统一处理,因为channel的并发写入是绝对安全的。当然如果是需要对多个文件做写入操作时,我们可以使用这样一种方式: 一个channel对应下游有多个channel(而不是file对象),每个channel都定义一个唯一ID,作为suffix。每个channel写完的文件都有ID,比如A,B,C三个channel现在得到的metadata是:向task.txt中写入数据,如果直接同时写,需要自己控制锁,我们使用task.txt.A task.txt.B task.txt.C的方式来写入,即channel A写入task.txt.A 中,B->B中。 再任务读取时,生成这样的3个channel并发读取到内存的总线channel。

Comments

Popular posts from this blog

Scrapy ERROR :ImportError: Error loading object 'scrapy.telnet.TelnetConsole': No module named conch

原文: https://stackoverflow.com/questions/17263509/error-while-using-scrapy-scrapy-telnet-telnetconsole-no-module-named-conch/17264705#17264705 On Ubuntu, you should avoid using  easy_install  wherever you can. Instead, you should be using  apt-get ,  aptitude , "Ubuntu Software Center", or another of the distribution-provided tools. For example, this single command is all you need to install scrapy - along with every one of its dependencies that is not already installed: $ sudo apt - get install python - scrapy easy_install  is not nearly as good at installing things as  apt-get . Chances are the reason you can't get it to work is that it didn't quite install things sensibly, particularly with respect to what was already installed on the system. Sadly, it also leaves no record of what it did, so uninstallation is difficult or impossible. You may now have a big mess on your system that prevents proper installations from working as well (or maybe not...

Elasticsearch error when the field exceed limit 32kb

When we post data that the field exceed the limit, elasticsearch will reject the data which error: {"error":{"root_cause":[{"type":"remote_transport_exception","reason":"[cs19-2][10.200.20.39:9301][indices:data/write/index]"}],"type":"illegal_argument_exception","reason":"Document contains at least one immense term in field=\"field1\" (whose UTF8 encoding is longer than the max length 32766), all of which were skipped.  Please correct the analyzer to not produce such terms.  The prefix of the first immense term is You can handle this: 1. you can udpate the mapping part at any time PUT INDEXNAME/_mapping/TYPENAME { "properties" : { "logInfo" : { "type" : "string" , "analyzer" : "keyword" , "ignore_above" : 32766 } } } ignore_above means that we will only keep 32766 bytes 2...

Python 使用socket实现ftp 客服端

之前先了解ftp协议,然后解释代码 连接到ftp服务器,得到一个socket(这是一个连接到ftp命令端口socket) 发送必要request 第一步 Connect到服务器后,ftp_socket.recv(1024) 到服务器的欢迎消息(1 中的socket),不要问为什么,ftp协议规定的,应该是。 *注意的是,后面ftp_socket每一次的请求后,都要recv一次,不管你是否全部接受到了都要recv一次,不然可能后面接受不到一些消息。个人觉着这可能是ftp协议的规定:每一次request,都会给client一个response。如果 client没有接受这个response,那么下次的request不会被服务器接受,所以client的recv就会卡住! 第二步就像代码中 直到 #LIST 都是用的命令端口。 而使用数据端口时,就是用命令端口   ftp_socket.send("PASV \r\n")     new_port = ftp_socket.recv(1024)      使用命令 PASV 请求一个动态的数据端口。 解释 我理解的动态数据端口: 即你每一次请求到一个数据端口后,你只能使用一次。比如: [ INFO] 2014-11-29 22:25:44,682 [admin] [127.0.0.1] SENT: 227 Entering Passive Mode (127,0,0,1,204,82)    这是ftp服务器端发送的请求(apache ftpserver),明显括号中是你的ip(我的是本机),然后两个数字。通过查询,你要请求的数据端口:(a,b,c,d,x,y)  new_port  = x*250+y 剩下的部分就很简单了。在代码中再有点解释(后面附server端log) 最后还应该用发送一个quit命令,告诉server 我的请求完毕了。这里忘了。 一点补充:看到server端log可以发现,服务器每一次的SEND  我们都应该recv一次 #download a folder's files  import socket class ParseUrl()...