问题描述
我正在Jupyter 5上使用Python 3.6.1.我的目标是测试 portalocker 管理在同一文件上的并发追加.
I am working with Python 3.6.1 on Jupyter 5. My goal is to test how portalocker manage concurrent appending on the same file.
为实现这一点,我做了一个简单的函数,将一个单行附加到同一文件中,并使用multiprocessing.Pool和Pool.map()并行运行该函数.
To accomplish that I have made a simple function that appends a single line to the same file and I use multiprocessing.Pool and Pool.map() to run the function in parallel.
这是Jupyter笔记本中的代码.
Here is the code in Jupyter notebook.
单元格1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
单元2
start = time()
with Pool(4) as p:
p.map(f, range(4))
单元3
with open('portalocker_test.txt', 'r') as f:
for line in f:
print(line, end='')
如果我运行此代码一次,便得到了预期的结果:
If I run this code once I get the expected result:
单元格3:
1495614277.189394
1495614277.1893928
1495614277.1893911
1495614277.1894028
但是如果我再次运行单元2(而不重新启动笔记本电脑),我会得到:
But if I run cell 2 again (without restarting the notebook) I get:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-db9c07d32724> in <module>()
1 start = time()
2 with Pool(4) as p:
----> 3 p.map(f, range(4))
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in map(self, func, iterable, chunksize)
258 in a list that is returned.
259 '''
--> 260 return self._map_async(func, iterable, mapstar, chunksize).get()
261
262 def starmap(self, func, iterable, chunksize=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in get(self, timeout)
606 return self._value
607 else:
--> 608 raise self._value
609
610 def _set(self, i, obj):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/pool.py in _handle_tasks(taskqueue, put, outqueue, pool, cache)
383 break
384 try:
--> 385 put(task)
386 except Exception as e:
387 job, ind = task[:2]
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/connection.py in send(self, obj)
204 self._check_closed()
205 self._check_writable()
--> 206 self._send_bytes(_ForkingPickler.dumps(obj))
207
208 def recv_bytes(self, maxlength=None):
/Users/xxx/Homebrew/Cellar/python3/3.6.1/Frameworks/Python.framework/Versions/3.6/lib/python3.6/multiprocessing/reduction.py in dumps(cls, obj, protocol)
49 def dumps(cls, obj, protocol=None):
50 buf = io.BytesIO()
---> 51 cls(buf, protocol).dump(obj)
52 return buf.getbuffer()
53
TypeError: cannot serialize '_io.TextIOWrapper' object
如果在运行单元2之前读取文件,则会引发相同的错误.因此,如果我从未在运行单元2之前打开文件,则一切正常.如果我以前打开过文件,则会收到该错误.这对我来说很不一致.到底是怎么回事?该怎么解决?
The same error gets raised if I read the file before running cell 2. So, If I never open the file before running cell 2, all goes fine. If I open the file before, then I get that error.This is pretty inconsistent to me. What is going on? How to solve it?
此外,使用或不使用Portalocker都不会更改此行为,因此这不是Portalocker的问题.我还没有在普通的python上检查它,但是我真的很想在Jupyter上运行它.
Also, using or not portalocker will not change this behavior, so it is not portalocker the problem. I haven't check it on plain python but I am really interested in running it with Jupyter.
推荐答案
问题是,对于不同的对象,您应该避免使用相同的名称,
the problem is that you should avoid same names for different objects, in your case should help
-
将功能名称从
f
更改为function
(或不同于f
的其他名称)
changing function name from
f
tofunction
(or another name different fromf
)
单元格1
from time import time
from multiprocessing import Pool
import portalocker
def function(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as f:
portalocker.lock(f, portalocker.LOCK_EX)
f.write(f'{time()}\n')
单元格2
start = time()
with Pool(4) as p:
p.map(function, range(4))
或
-
将使用
open
获得的文件对象重命名为从f
到file
(或与f
不同的其他名称):
renaming file objects obtained with
open
fromf
tofile
(or another name different fromf
):
单元格1
from time import time
from multiprocessing import Pool
import portalocker
def f(*args):
while time() < start + 1:
pass
with open('portalocker_test.txt', 'a') as file:
portalocker.lock(file, portalocker.LOCK_EX)
file.write(f'{time()}\n')
单元3
with open('portalocker_test.txt', 'r') as file:
for line in file:
print(line, end='')
或两者
这篇关于不一致的TypeError:无法序列化_io.TextIOWrapper对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持!