Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

浅谈python中的多进程 #62

Open
helios741 opened this issue Oct 17, 2019 · 0 comments
Open

浅谈python中的多进程 #62

helios741 opened this issue Oct 17, 2019 · 0 comments

Comments

@helios741
Copy link
Owner

helios741 commented Oct 17, 2019

一、为什么要实现多进程编程

浅谈python中的多线程编程中说到了,因为Python中的GIL机制,python的多线程不能充分利用多核CPU。

简单的说GIL就是对于一个进程来说,同时只能处理其中的一个线程

在python中,如果要充分利用计算机多核的特性或者说要进行高计算量的任务的话就要用的多进程。

多进程 多线程 备注
对资源要求 OS创建一个进程的耗费肯定要比创建一个线程浪费的多,具体可以参考我的两个笔记内核创建进程创建线程
高计算场景 不适用 适用 多线程在高计算的场景下容易阻塞,因为就用一个核
高I/O场景 适用 中度适用 虽然多进程也适用于高I/O,但是进程间通信慢类的原因,可是导致还不如多线程

等以后将完协程,会把这个表进一步完善。

二、多进程编程

1. 通过操作系统的fork函数

import os
print("not fork")
pid = os.fork()
print("already fork")

if pid == 0:
    print("my is child process id is: {}, father is: {}".format(os.getpid(), os.getppid()))
else:
    print("my is father, pid is :{}".format(pid))

output:

not fork
already fork
my is father, pid is :78348
already fork
my is child process id is: 78348, father is: 78347

在调用fork之后,操作系统就会产生一个新的进程,所有就会执行两次*pid = os.fork()*下面的逻辑:

  • 如果pid==0: 代表是子进程
  • 如果pid不等于0:代表是父进程

2. 通过multiprocess模块

import multiprocessing
import time
def foo(p):
    print("current process name is {}".format(multiprocessing.current_process().name))
    time.sleep(1)
    print("this is p: {}".format(p))

multiprocessing.Process(target=foo, name="test_process", args=["params"]).start()

print("main end")

ouput:

main end
current process name is test_process
this is p: params

很明显这个要主进程等到子进程结束才会结束,那么如果我们想要把子进程变为后台运行,可以看下面这个例子:

import multiprocessing
import time
def foo(p):
    print("current process name is {}".format(multiprocessing.current_process().name))
    time.sleep(1)
    print("this is p: {}".format(p))

p = multiprocessing.Process(target=foo, name="test_process", args=["params"])

p.daemon = True
p.start()
# time.sleep(2)
print("main end")

主进程会根据子进程的daemon属性来决定这个子进程的命运:

  • daemon: False 主进程会等子进程死了之后在退出
  • daemon: True 不管子进程运行到什么状态,直接退出

3. 通过继承multiprocessing.Process

import multiprocessing

class MyProcess(multiprocessing.Process):
    def __init__(self, name):
        super().__init__(name=name)
    def run(self):
        print("child process running")


p = MyProcess("test_process_class")
p.start()
print("child process name is: {}".format(p.name))
print("main end")

三、进程间管理状态

python的多进程模块提供了在多进程中共享信息的对象(Manager),该对象能保证当一个进程修改了共享对象的时候,所有的进程都能拿到更新后的共享对象。

import multiprocessing
import time
def worker(d, k, v):
    time.sleep(1)
    d[k] = v
    print("key = {}, value = {}".format(k, v))

mgr = multiprocessing.Manager()
dict = mgr.dict()

[multiprocessing.Process(target=worker, args=(dict, i, i*2)).start() for i in range(10)]
time.sleep(2)
print(dict)

output:

key = 1, value = 2
key = 2, value = 4
key = 0, value = 0
key = 3, value = 6
key = 4, value = 8
key = 6, value = 12
key = 5, value = 10
key = 7, value = 14
key = 8, value = 16
key = 9, value = 18
{1: 2, 2: 4, 0: 0, 3: 6, 4: 8, 6: 12, 5: 10, 7: 14, 8: 16, 9: 18}

三、进程通信

 四、进程同步

1. 通过Lock

下面的程序多次执行的话,结果可能会不同:

import multiprocessing
import time
def worker(d):
    v = d.get("k", 0)
    d["k"] = v + 1
mgr = multiprocessing.Manager()
dict = mgr.dict()
[multiprocessing.Process(target=worker, args=(dict,)).start() for i in range(10)]
time.sleep(2)
print(dict)

通过加锁的方式能够解决:

import multiprocessing
import time

def worker(d, lock):
    with lock:
        v = d.get("k", 0)
        d["k"] = v + 1

lock = multiprocessing.Lock()
mgr = multiprocessing.Manager()
dict = mgr.dict()
[multiprocessing.Process(target=worker, args=(dict, lock, )).start() for i in range(10)]
time.sleep(2)
print(dict)

2. 通过信号(Semaphore)

表示最多能够多有多少个进程操作共享资源:

import multiprocessing
import time
def worker(sema, i):
    sema.acquire()
    print(multiprocessing.current_process().name + " acquire")
    time.sleep(i)
    print(multiprocessing.current_process().name + " release")
    sema.release()
s = multiprocessing.Semaphore(2)
[multiprocessing.Process(target=worker, args=(s, i * 1.5)).start() for i in range(5)]

time.sleep(10)

3. 通过Event

内部维护一个变量,当变量为true时候,wait将被唤醒或者调用wait不被阻塞。
一个进程发送事件信号,一个进程等待事件信号。主要有下面三个API:

  • set:将内部变量设置为true,通知等待的事件的进程
  • wait(timeout=None): 阻塞进程直到内部变量为true。如果调用时内部标志为true,将立即返回。否则将阻塞进程,直到调用 set() 方法将标志设置为true或者发生可选的超时。
  • clear:将内部变量设置为false,其他进程调用wait方法将继续阻塞
import multiprocessing
import time

def event_set(e):
    time.sleep(2)
    print("start set")
    e.set()
    e.clear()
    print("end set")

def event_wait(e, i):
    time.sleep(i)
    print("start wait {}".format(i))
    e.wait()
    print("end wait {}".format(i))

eve = multiprocessing.Event()

multiprocessing.Process(target=event_wait, args=(eve, 1)).start()
multiprocessing.Process(target=event_set, args=(eve,)).start()
multiprocessing.Process(target=event_wait, args=(eve, 3)).start()

time.sleep(5)

结果是只有一个进程会被唤醒,其余的都继续等待。

4. 通过Condition

  • wait:用来等待进程
  • notify_all: 通知所有等待的进程

5. Barrier

栅栏类提供一个简单的同步原语,用于应对固定数量的进程需要彼此相互等待的情况。进程调用 wait() 方法后将阻塞,直到所有进程都调用了 wait() 方法。此时所有进程将被同时释放。

就是当一个任务需要多个进程同时合作完成的时候就派上了用场。

import multiprocessing
import time

def worker(bar, i):
    time.sleep(i)
    print("barrier {}  start".format(i))
    bar.wait()
    print("barrier  {} end".format(i))

bar = multiprocessing.Barrier(2)
multiprocessing.Process(target=worker, args=(bar, 1)).start()
multiprocessing.Process(target=worker, args=(bar, 2)).start()

time.sleep(3)

1. 通过queue

2.

总结

参考

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant