Deadlock caused by forking a multithreaded process

最近生产环境的一个模块突然卡住了,模块的作者已经去其他组了,没办法只能自己查问题了🤣。

该模块使用python的logging标准库记录日志,自定义kafka作为handler,日志通过kafka发送到Elasticsearch,然后就是ELK日志处理了。

模块为了提高吞吐,内部使用了多进程,问题恰恰就出现在了这里。

使用GDB attach到卡死的进程,执行py-bt命令,输出如下:

Traceback (most recent call first):

File “/usr/lib64/python2.7/multiprocessing/forking.py”, line 135, in poll

pid, sts = os.waitpid(self.pid, flag)

File “/usr/lib64/python2.7/multiprocessing/forking.py”, line 154, in wait

return self.poll(0)

File “/usr/lib64/python2.7/multiprocessing/process.py”, line 145, in join

res = self._popen.wait(timeout)

File “parser/parser_main.py”, line 76, in data_parse

process.join()

主进程里使用multiprocessing模块创建多进程执行任务,这里主进程在等待子进程执行结束。

阅读multiprocessing模块的源代码,可以发现其创建子进程时调用了os.fork()函数,该函数是python os模块对系统调用fork的封装。

子进程的输出:

Traceback (most recent call first):

Waiting for the GIL

File “sdk/python-lib/kafka/producer/record_accumulator.py”, line 223, in append

with self._tp_locks[tp]:

File “sdk/python-lib/kafka/producer/kafka.py”, line 516, in send

self.config[‘max_block_ms’])

File “sdk/python-lib/elk_logging/handlers/kafka.py”, line 37, in emit

self.format(record).encode(‘utf-8’))

elk_logging模块自定义了kafka handler,模块导入时通过logging.config.dictConfig()配置logging模块。kafak发送消息时获取不到锁self._tp_lock[tp],然后一直等待。

在Google上搜索“kafka self._tp_lock deadlock”没有发现有价值的信息,基本排除是kafka库的bug,如果是kafka库的bug,网上不太可能一点线索也没有。

前文我们提到这里使用了fork创建子进程,当内核执行fork时,生成的子进程是父进程的副本,子进程获得父进程的数据空间、堆和栈的副本(这里是子进程自己拥有的副本,并不与父进程共享这些存储空间。很多实现使用了写时复制技术,一开始这些区域被父子进程共享,如果任意一个进程修改这些区域,则内核只为修改的那块内存制作一个副本,通常是虚拟内存的一页)。

在Linux系统中调用fork,创建的子进程中只有一个线程——父进程中调用fork函数的线程。

Note the following further points:

  • The child process is created with a single thread—the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.

也不是所有的系统都是这样,Solaris系统libthread库提供的fork函数会“复制“所有线程。

Solaris libthread supports both fork() and fork1(). The fork() call has “fork-all” semantics–it duplicates everything in the process, including threads and LWPs, creating a true clone of the parent. The fork1() call creates a clone that has only one thread; the process state and address space are duplicated, but only the calling thread is cloned.

POSIX libpthread supports only fork(), which has the same semantics as fork1() in Solaris threads.

POSIX标准定义的行为是fork只“复制”一个线程。

那么这些与上面的死锁有什么关系?python标准库logging模块在创建logger时会调用到以下代码,可以看到一个logger只会实例化一次,对应的handler和formatter等也只会实例化一次。

logging/__init__.py manager.getLogger(name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
_acquireLock()
try:
if name in self.loggerDict:
rv = self.loggerDict[name]
if isinstance(rv, PlaceHolder):
ph = rv
rv = (self.loggerClass or _loggerClass)(name)
rv.manager = self
self.loggerDict[name] = rv
self._fixupChildren(ph, rv)
self._fixupParents(rv)
else:
rv = (self.loggerClass or _loggerClass)(name)
rv.manager = self
self.loggerDict[name] = rv
self._fixupParents(rv)
finally:
_releaseLock()

如果自定义了handler,配置logging模块时会导入指定的handler class,并实例化。

logging/config.py DictConfigurator.configure_handler()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if '()' in config:
c = config.pop('()')
if not callable(c):
c = self.resolve(c)
factory = c
else:
cname = config.pop('class')
klass = self.resolve(cname)
factory = klass
props = config.pop('.', None)
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
try:
result = factory(**kwargs)
except TypeError as te:
if "'stream'" not in str(te):
raise
kwargs['strm'] = kwargs.pop('stream')
result = factory(**kwargs)

父进程和fork出来的子进程使用的时相同的对象实例,相同的logger,相同的kafka handler。

kafka producer发送消息是多线程异步的。KafkaProducer实例化时会启动一个消息发送线程。有消息过来后,先通过RecordAccumulator类实例将消息追加到内部list,然后唤醒self._sender发送线程,发送线程通过RecordAccumulator类实例检查哪些可以发送,整个过程RecordAccumulator类实例维护了topic+partition相关的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def __init__(self, **configs):
...
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)
self._sender = Sender(client, self._metadata,
self._accumulator, self._metrics,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
self._sender.start()
self._closed = False

self._cleanup = self._cleanup_factory()
atexit.register(self._cleanup)
log.debug("Kafka producer started")

如果fork时RecordAccumulator类实例中某个topic+partition的锁还没有释放,子进程中该锁永远不会被释放,因为没有了发送线程,如果子进程中恰好向同一个topic和同一个partition发送消息就会死锁。

由于子进程不会常驻在后台,所以满足上述条件还是比较难的,从表现上来看,5个月里发生了一次死锁。

后记

避免这种问题,最好的办法就是不要fork一个多线程进程。

python的标准库logging模块曾经因为类似的原因出过问题。现在官方文档已经指出:

Note that safely forking a multithreaded process is problematic.

Google甚至还创建过一个项目python-atfork,模仿pthread_atfork,可以注册函数,保正多线程的情况下可以安全的fork。当然,该项目已经被废弃了。

如果使用较新的python版本(3.5+ ?)可以指定multiprocessing模块创建进程的方式,可以尝试spawn方式。

Ref