本文介绍了为什么使用iteratee IO的我的Mapreduce实现(真实世界haskell)也失败,出现“太多打开的文件”的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述 29岁程序员,3月因学历无情被辞! 我正在执行一个haskell程序,它将文件的每一行与文件中的每一行进行比较。可以实现单线程,如下所示: distance:Int - > Int - > Int distance a b =(a-b)*(a-b) sumOfDistancesOnSmallFile :: FilePath - > IO Int sumOfDistancesOnSmallFile path = do fileContents< - readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances(x:xs)= (allDistances xs)+(sum $ map(distance x)xs) allDistances _ = 0 因此,对上述单线程解决方案有两个改进。首先,加快实际运行时间。其次,找到一种方法,不要将整个列表全部留在内存中。我知道这需要解析整个文件n次。因此将会有O(n ^ 2)比较,并且解析O(n ^ 2)行。这对我来说是好的,因为我宁愿有一个缓慢的成功程序而不是失败的程序。当输入文件足够小时,我总是可以驻留到更简单的版本。 要使用多个cpu核心,我将Mapreduce实现从Real World Haskell中移出(第24章, here )。 我将本书中的分块函数修改为,而不是将整个文件分成块,返回尽可能多的块,每块代表中的一个元素 尾巴。线。 readFile 因为我希望程序也可以以文件大小进行扩展,所以我最初使用懒惰IO 。然而,这个失败与太多打开的文件,关于我在 正如接受的答案所解释的那样, strict IO 可以解决问题。这确实解决了2k行文件的太多打开的文件问题,但是在50k文件中内存不足。 请注意,第一个单线程实现(没有mapreduce)能够处理一个50k文件。 另一种解决方案对我来说最吸引人的地方是使用迭代IO 。我预计这将解决文件句柄和内存资源耗尽。然而,我的实现仍然在2k行文件上出现太多打开的文件错误。 $ b iteratee IO版本具有相同的 mapReduce 函数在书中,但有一个修改 chunkedFileEnum ,让它与一个枚举器。 因此,我的问题是;以下迭代IO基本实现有什么问题? 导入Control.Monad.IO.Class(liftIO)导入Control.Monad。 Trans(MonadIO,liftIO)导入System.IO 导入限定的Data.Enumerator.List作为EL 导入限定的Data.Enumerator.Text作为ET 导入数据.Enumerator隐藏(映射,过滤器,头,序列) 导入Data.Text(文本)导入Data.Text.Read 导入Data.Maybe 导入合格的Data.ByteString.Char8作为Str 导入Control.Exception(括号,最后)导入Control.Monad(forM,liftM)导入Control.Parallel.Strategies import Control.Parallel import Control.DeepSeq(NFData) import Data.Int(Int64) $ b $ - 目标:在有n个值的文件中,计算总和的所有n *(n-1)/ 2平方距离 - 我为一个值对操作 distance :: Int - > Int - > Int distance a b =(a-b)*(a-b) combineDistances :: [Int] - > Int combineDistances = sum - 测试文件生成 createTestFile :: Int - > FilePath - > IO() createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1 where infiniteList :: Int-> Int-> [Int] infiniteList ij =(i + j):infiniteList j(i + j) - 将我的操作简单地应用于文件 - (其实不会抛出一个由createTestFile 50000生成的文件内存不足) - 但我想使用多个核心。 sumOfDistancesOnSmallFile :: FilePath - > IO Int sumOfDistancesOnSmallFile path = do fileContents< - readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances(x:xs)= (allDistances xs)+(sum $ map(distance x)xs) allDistances _ = 0 - 设置文本流读取值的枚举数 readerEnumerator: :Monad m =>积分a =>阅读器a - >步骤a m b - > Iteratee Text m b readerEnumerator reader = joinI。 (EL.concatMapM变换器)其中变压器输入=案例阅读器输入右(val,余数) - > return [val] Left err - > return [0] readEnumerator :: Monad m => Integral a =>步骤a m b - > Iteratee Text m b readEnumerator = readerEnumerator(带符号的十进制) - 我操作的迭代版本 distanceFirstToTailIt :: Monad m => Iteratee Int m Int distanceFirstToTailIt = do mayNum< - EL.head 也许(返回0)distanceOneToManyIt mayNum distanceOneToManyIt :: Monad m => Int - > Iteratee Int m Int distanceOneToManyIt base = do mayNum< - EL.head maybe(return 0)combineNextDistance maybeNum where combineNextDistance nextNum = do rest< - 距离1到多个基数返回$ combineDistances [(distance base nextNum),rest] - 映射精简算法 mapReduce ::策略b - 映射$ b $的评估策略b - > (a - > b) - 映射函数 - >策略c - 减少评估策略 - > ([b] - > c) - 减少功能 - > [a] - 列表映射到 - > c mapReduce mapStrat mapFunc reduceStrat reduceFunc输入= mapResult`pseq` reduceResult 其中mapResult = parMap mapStrat mapFunc输入 reduceResult = reduceFunc mapResult`使用`reduceStrat - 使用mapreduce 应用iteratee操作sumOfDistancesOnFileWithIt :: FilePath - > IO Int sumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails(spacingUsingMapReduceIt)path 距离使用MapReduceIt :: [Enumerator Text IO Int] - > IO int b $ b distanceUsingMapReduceIt = mapReduce rpar(runEnumeratorAsMapFunc) rpar(sumValuesAsReduceFunc)其中runEnumeratorAsMapFunc :: Enumerator Text IO Int - > IO Int runEnumeratorAsMapFunc =(\source-> run_(source $$ readEnumerator $$ spacingFirstToTailIt)) sumValuesAsReduceFunc :: [IO Int] - > IO Int sumValuesAsReduceFunc = liftM sum。序列 - 使用(文件)块枚举器: data ChunkSpec = CS { chunkOffset ::!Int ,chunkLength ::! (Eq,Show) chunkedFileEnum ::(NFData(a))=> MonadIO m => (FilePath-> IO [ChunkSpec]) - > ([Enumerator Text m b] - > IO a) - > FilePath - > IO a chunkedFileEnum chunkCreator funcOnChunks path = do (chunks,handles)r (rdeepseq r`seq`(return r) )`finally` mapM_ hClose句柄 chunkedEnum :: MonadIO m => (FilePath - > IO [ChunkSpec]) - > FilePath - > IO([Enumerator Text m b],[Handle]) chunkedEnum chunkCreator path = do chunks< - chunkCreator路径 liftM解压缩。 forM chunks $ \spec - >做h< - openFile路径ReadMode hSeek h AbsoluteSeek(fromIntegral(chunkOffset spec)) let chunk = ET.enumHandle h - 注意:chunklength没有考虑到,所以只是为了EOF return(chunk,h) - 返回表示尾部的块。线。 readFile chunkByLinesTails :: FilePath - > IO [ChunkSpec] chunkByLinesTails path = do 括号(openFile路径ReadMode)hClose $ \h-> do totalSize< - fromIntegral`liftM` hFileSize h let chunkSize = 1 findChunks offset = do let newOffset = offset + chunkSize hSeek h AbsoluteSeek(fromIntegral newOffset) let findNewline lineSeekOffset = do eof if eof then return [CS offset(totalSize - offset)] else do 字节< - Str.hGet h 256 case 的Str.elemIndex'\\\'字节只是n - > nextChunks< - findChunks(lineSeekOffset + n + 1) return(CS offset(totalSize-offset):nextChunks) Nothing - > findNewline(lineSeekOffset + Str.length bytes) findNewline newOffset findChunks 0 顺便说一句,我运行 HaskellPlatform 2011.2.0 Mac OS X 10.6.7(雪豹) 与下列软件包: bytestring 0.9.1.10 > parallel 3.1.0.1 enumerator 0.4.8,带手动这里 解决方案由于错误提示,打开的文件太多。我希望Haskell能够顺序地运行大部分程序,但有些火花并行。然而,正如sclv所提到的,Haskell总是激发评估。 这在纯函数式程序中通常不是问题,而是在处理IO(资源)时。我把真实世界Haskell书中所描述的并行化扩展得太高了。所以我的结论是,在处理火花内的IO资源时,只能在有限的范围内进行并行处理。在纯功能部分,过度的并行可能会成功。 因此,对于我的文章的回答是,在整个程序中不使用MapReduce,而是在内部的纯功能部分。 为了显示程序实际失败的位置,我使用--enable-executable-profiling -p配置了它,并使用+ RTS -p运行它, hc -L30。由于可执行文件立即失败,因此没有内存分配配置文件。 .prof文件中的结果时间分配配置文件从以下开始: 单个继承成本中心模块编号。项目%时间%分配%时间%分配 主要主要1 0 0.0 0.3 100.0 100.0 主要主要1648 2 0.0 0.0 50.0 98.9 sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9 chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9 chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9 lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6 chunkedEnum返回IO([Enumerator Text mb],[Handle]),并且显然接收到495个条目。输入文件是一个2k行文件,因此lineOffset上的单个条目返回了2000个偏移量的列表。距离使用MapReduceIt没有一个条目,所以实际工作甚至没有开始! I am implementing a haskell program wich compares each line of a file with each other line in the file. Which can be implemented single threaded as followsdistance :: Int -> Int -> Intdistance a b = (a-b)*(a-b)sumOfDistancesOnSmallFile :: FilePath -> IO IntsumOfDistancesOnSmallFile path = do fileContents <- readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs) allDistances _ = 0This will run in O(n^2) time, and has to keep the complete list of integers in memory the whole time. In my actual program the line contains more numbers, out of which I construct a slightly complexer datatype than Int. This gave me out of memory errors on the data I have to process.So there are two improvements to be made to the above-mentioned single threaded solution. First, speed up the actual running time. Second, find a way to not keep the whole list in memory the full time. I know this requires parsing the complete file n times. Thus there will be O(n^2) comparisons, and O(n^2) lines parsed. This is OK for me as I'd rather have a slow successful program than a failing program. When the input file is small enough I can always reside to a simpler version.To use multiple cpu cores I took the Mapreduce implementation out of Real World Haskell (chapter 24, available here).I modified the chunking function from the book to, instead of dividing the complete file in chunks, return as many chunks as lines with each chunk representing one element oftails . lines . readFileBecause I want the program also to be scalable in file-size, I initially used lazy IO. This however fails with "Too many open files", about which I asked in a previous question (the file handles were disposed too late by the GC). The full lazy IO version is posted there.As the accepted answer explains, strict IO could solve the issue. That indeed solves the "Too many open files" problem for 2k line files, but fails with "out of memory" on a 50k file.Note that the first single threaded implementation (without mapreduce) is capable of handling a 50k file.The alternative solution, which also appeals most to me, is to use iteratee IO. I expected this to solve both the file handle, and memory resource exhaustion. My implementation however still fails with a "Too many open files" error on a 2k line file.The iteratee IO version has the same mapReduce function as in the book, but has a modified chunkedFileEnum to let it work with an Enumerator.Thus my question is; what is wrong with the following iteratee IO base implementation? Where is the Laziness?.import Control.Monad.IO.Class (liftIO)import Control.Monad.Trans (MonadIO, liftIO)import System.IOimport qualified Data.Enumerator.List as ELimport qualified Data.Enumerator.Text as ETimport Data.Enumerator hiding (map, filter, head, sequence)import Data.Text(Text)import Data.Text.Readimport Data.Maybeimport qualified Data.ByteString.Char8 as Strimport Control.Exception (bracket,finally)import Control.Monad(forM,liftM)import Control.Parallel.Strategiesimport Control.Parallelimport Control.DeepSeq (NFData)import Data.Int (Int64)--Goal: in a file with n values, calculate the sum of all n*(n-1)/2 squared distances--My operation for one value pairdistance :: Int -> Int -> Intdistance a b = (a-b)*(a-b)combineDistances :: [Int] -> IntcombineDistances = sum--Test file generationcreateTestFile :: Int -> FilePath -> IO ()createTestFile n path = writeFile path $ unlines $ map show $ take n $ infiniteList 0 1 where infiniteList :: Int->Int-> [Int] infiniteList i j = (i + j) : infiniteList j (i+j)--Applying my operation simply on a file--(Actually does NOT throw an Out of memory on a file generated by createTestFile 50000)--But i want to use multiple cores..sumOfDistancesOnSmallFile :: FilePath -> IO IntsumOfDistancesOnSmallFile path = do fileContents <- readFile path return $ allDistances $ map read $ lines $ fileContents where allDistances (x:xs) = (allDistances xs) + ( sum $ map (distance x) xs) allDistances _ = 0--Setting up an enumerator of read values from a text streamreaderEnumerator :: Monad m =>Integral a => Reader a -> Step a m b -> Iteratee Text m breaderEnumerator reader = joinI . (EL.concatMapM transformer) where transformer input = case reader input of Right (val, remainder) -> return [val] Left err -> return [0]readEnumerator :: Monad m =>Integral a => Step a m b -> Iteratee Text m breadEnumerator = readerEnumerator (signed decimal)--The iteratee version of my operationdistancesFirstToTailIt :: Monad m=> Iteratee Int m IntdistancesFirstToTailIt = do maybeNum <- EL.head maybe (return 0) distancesOneToManyIt maybeNumdistancesOneToManyIt :: Monad m=> Int -> Iteratee Int m IntdistancesOneToManyIt base = do maybeNum <- EL.head maybe (return 0) combineNextDistance maybeNum where combineNextDistance nextNum = do rest <- distancesOneToManyIt base return $ combineDistances [(distance base nextNum),rest]--The mapreduce algorithmmapReduce :: Strategy b -- evaluation strategy for mapping -> (a -> b) -- map function -> Strategy c -- evaluation strategy for reduction -> ([b] -> c) -- reduce function -> [a] -- list to map over -> cmapReduce mapStrat mapFunc reduceStrat reduceFunc input = mapResult `pseq` reduceResult where mapResult = parMap mapStrat mapFunc input reduceResult = reduceFunc mapResult `using` reduceStrat--Applying the iteratee operation using mapreducesumOfDistancesOnFileWithIt :: FilePath -> IO IntsumOfDistancesOnFileWithIt path = chunkedFileEnum chunkByLinesTails (distancesUsingMapReduceIt) pathdistancesUsingMapReduceIt :: [Enumerator Text IO Int] -> IO IntdistancesUsingMapReduceIt = mapReduce rpar (runEnumeratorAsMapFunc) rpar (sumValuesAsReduceFunc) where runEnumeratorAsMapFunc :: Enumerator Text IO Int -> IO Int runEnumeratorAsMapFunc = (\source->run_ (source $$ readEnumerator $$ distancesFirstToTailIt)) sumValuesAsReduceFunc :: [IO Int] -> IO Int sumValuesAsReduceFunc = liftM sum . sequence--Working with (file)chunk enumerators:data ChunkSpec = CS{ chunkOffset :: !Int , chunkLength :: !Int } deriving (Eq,Show)chunkedFileEnum :: (NFData (a)) => MonadIO m => (FilePath-> IO [ChunkSpec]) -> ([Enumerator Text m b]->IO a) -> FilePath -> IO achunkedFileEnum chunkCreator funcOnChunks path = do (chunks, handles)<- chunkedEnum chunkCreator path r <- funcOnChunks chunks (rdeepseq r `seq` (return r)) `finally` mapM_ hClose handleschunkedEnum :: MonadIO m=> (FilePath -> IO [ChunkSpec]) -> FilePath -> IO ([Enumerator Text m b], [Handle])chunkedEnum chunkCreator path = do chunks <- chunkCreator path liftM unzip . forM chunks $ \spec -> do h <- openFile path ReadMode hSeek h AbsoluteSeek (fromIntegral (chunkOffset spec)) let chunk = ET.enumHandle h --Note:chunklength not taken into account, so just to EOF return (chunk,h)-- returns set of chunks representing tails . lines . readFilechunkByLinesTails :: FilePath -> IO[ChunkSpec]chunkByLinesTails path = do bracket (openFile path ReadMode) hClose $ \h-> do totalSize <- fromIntegral `liftM` hFileSize h let chunkSize = 1 findChunks offset = do let newOffset = offset + chunkSize hSeek h AbsoluteSeek (fromIntegral newOffset) let findNewline lineSeekOffset = do eof <- hIsEOF h if eof then return [CS offset (totalSize - offset)] else do bytes <- Str.hGet h 256 case Str.elemIndex '\n' bytes of Just n -> do nextChunks <- findChunks (lineSeekOffset + n + 1) return (CS offset (totalSize-offset):nextChunks) Nothing -> findNewline (lineSeekOffset + Str.length bytes) findNewline newOffset findChunks 0Btw, I'm runningHaskellPlatform 2011.2.0 on Mac OS X 10.6.7 (snow leopard)with the following packages:bytestring 0.9.1.10parallel 3.1.0.1enumerator 0.4.8 , with a manual here 解决方案 As the error says, there are too many open files. I expected Haskell to run most of the program sequentially, but some 'sparks' parallel. However, as sclv mentioned, Haskell always sparks the evaluations.This usually is not a problem in a pure functional program, but it is when dealing with IO (resources). I scaled the parallelism as described in the Real World Haskell book too far up. So my conclusion is to do parallelism only on a limited scale when dealing with IO resources within the sparks. In the pure functional part, excessive parallelism may succeed.Thus the answer to my post is, to not use MapReduce on the whole program, but within an inner pure functional part.To show where the program actually failed, i configured it with --enable-executable-profiling -p, build it, and ran it using +RTS -p -hc -L30. Because the executable fails immediately, there is no memory allocation profile. The resulting time allocation profile in the .prof file starts with the following: individual inheritedCOST CENTRE MODULE no. entries %time %alloc %time %allocMAIN MAIN 1 0 0.0 0.3 100.0 100.0 main Main 1648 2 0.0 0.0 50.0 98.9 sumOfDistancesOnFileWithIt MapReduceTest 1649 1 0.0 0.0 50.0 98.9 chunkedFileEnum MapReduceTest 1650 1 0.0 0.0 50.0 98.9 chunkedEnum MapReduceTest 1651 495 0.0 24.2 50.0 98.9 lineOffsets MapReduceTest 1652 1 50.0 74.6 50.0 74.6chunkedEnum returns IO ([Enumerator Text m b], [Handle]), and apparently receives 495 entries. The input file was a 2k line file, so the single entry on lineOffsets returned a list of 2000 offsets. There is not a single entry in distancesUsingMapReduceIt, so the actual work did not even start! 这篇关于为什么使用iteratee IO的我的Mapreduce实现(真实世界haskell)也失败,出现“太多打开的文件”的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持! 上岸,阿里云!
08-24 03:17