日韩av黄I国产麻豆传媒I国产91av视频在线观看I日韩一区二区三区在线看I美女国产在线I麻豆视频国产在线观看I成人黄色短片

歡迎訪問 生活随笔!

生活随笔

當前位置: 首頁 >

python并行编程语言_Python3 系列之 并行编程

發布時間:2025/3/21 47 豆豆
生活随笔 收集整理的這篇文章主要介紹了 python并行编程语言_Python3 系列之 并行编程 小編覺得挺不錯的,現在分享給大家,幫大家做個參考.

Python

Python開發

Python語言

Python3 系列之 并行編程

進程和線程

進程是程序運行的實例。一個進程里面可以包含多個線程,因此同一進程下的多個線程之間可以共享線程內的所有資源,它是操作系統動態運行的基本單元;每一個線程是進程下的一個實例,可以動態調度和獨立運行,由于線程和進程有很多類似的特點,因此,線程又被稱為輕量級的進程。線程的運行在進程之下,進程的存在依賴于線程;

開胃菜

基于 Python3 創建一個簡單的進程示例

from threading import Thread

from time import sleep

class CookBook(Thread):

def __init__(self):

Thread.__init__(self)

self.message = "Hello Parallel Python CookBook!!n"

def print_message(self):

print(self.message)

def run(self):

print("Thread Startingn")

x = 0

while x < 10:

self.print_message()

sleep(2)

x += 1

print("Thread Ended!n")

print("Process Started")

hello_python = CookBook()

hello_python.start()

print("Process Ended")

需要注意的是,永遠不要讓線程在后臺默默執行,當其執行完畢后要及時釋放資源。

基于線程的并行

多線程編程一般使用共享內存空間進行線程間的通信,這就使管理內存空間成為多線程編程的關鍵。Python 通過標準庫 threading 模塊來管理線程,具有以下的組件:

線程對象

Lock 對象

RLock 對象

信號對象

條件對象

事件對象

定義一個線程

基本語法

示例代碼如下所示

import threading

def function(i):

print("function called by thread: {0}".format(i))

return

threads = []

for i in range(5):

t = threading.Thread(target=function, args=(i,))

threads.append(t)

t.start()

lambda t, threads: t.join()

需要注意的是,線程創建后并不會自動運行,需要主動調用 start() 方法來啟動線程,join() 會讓調用它的線程被阻塞直到執行結束。(PS:可通過調用 t.setDaemon(True) 使其為后臺線程避免主線程被阻塞)

線程定位

示例代碼如下所示

import threading

import time

def first_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

def second_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

def third_function():

print("{0} is starting".format(threading.currentThread().getName()))

time.sleep(2)

print("{0} is Exiting".format(threading.currentThread().getName()))

if __name__ == "__main__":

t1 = threading.Thread(target=first_function,name="first")

t2 = threading.Thread(target=second_function,name="second")

t3 = threading.Thread(target=third_function,name="third")

t1.start()

t2.start()

t3.start()

t1.join()

t2.join()

t3.join()

通過設置 threading.Thread() 函數的 name 參數來設置線程名稱,通過 threading.currentThread().getName() 來獲取當前線程名稱;線程的默認名稱會以 Thread-{i} 格式來定義

自定義一個線程對象

示例代碼如下所示

import threading

import time

exitFlag = 0

class myThread(threading.Thread):

def __init__(self, threadID, name, counter):

threading.Thread.__init__(self)

self.threadID = threadID

self.name = name

self.counter = counter

def run(self):

print("Starting:{0}".format(self.name))

print_time(self.name, self.counter, 5)

print("Exiting:{0}".format(self.name))

def print_time(threadName, delay, counter):

while counter:

if exitFlag:

thread.exit()

time.sleep(delay)

print("{0} {1}".format(threadName, time.ctime(time.time())))

counter -= 1

t1 = myThread(1, "Thread-1", 1)

t2 = myThread(2, "Thread-2", 1)

t1.start()

t2.start()

t1.join()

t2.join()

print("Exiting Main Thread.")

如果想自定義一個線程對象,首先就是要定義一個繼承 threading.Thread 類的子類,實現構造函數, 并重寫 run() 方法即可。

線程同步

Lock

示例代碼如下所示

import threading

shared_resource_with_lock = 0

shared_resource_with_no_lock = 0

COUNT = 100000

shared_resource_lock = threading.Lock()

def increment_with_lock():

global shared_resource_with_lock

for i in range(COUNT):

shared_resource_lock.acquire()

shared_resource_with_lock += 1

shared_resource_lock.release()

def decrement_with_lock():

global shared_resource_with_lock

for i in range(COUNT):

shared_resource_lock.acquire()

shared_resource_with_lock -= 1

shared_resource_lock.release()

def increment_without_lock():

global shared_resource_with_no_lock

for i in range(COUNT):

shared_resource_with_no_lock += 1

def decrement_wthout_lock():

global shared_resource_with_no_lock

for i in range(COUNT):

shared_resource_with_no_lock -= 1

if __name__ == "__main__":

t1 = threading.Thread(target=increment_with_lock)

t2 = threading.Thread(target=decrement_with_lock)

t3 = threading.Thread(target=increment_without_lock)

t4 = threading.Thread(target=decrement_wthout_lock)

t1.start()

t2.start()

t3.start()

t4.start()

t1.join()

t2.join()

t3.join()

t4.join()

print("the value of shared variable with lock management is :{0}".format(

shared_resource_with_lock))

print("the value of shared variable with race condition is :{0}".format(

shared_resource_with_no_lock))

通過 threading.Lock() 方法我們可以拿到線程鎖,一般有兩種操作方式:acquire() 和 release() 在兩者之間是加鎖狀態,如果釋放失敗的話會顯示 RuntimError() 的異常。

RLock

RLock 也叫遞歸鎖,和 Lock 的區別在于:誰拿到誰釋放,是通過 threading.RLock() 來拿到的;

示例代碼如下所示

import threading

import time

class Box(object):

lock = threading.RLock()

def __init__(self):

self.total_items = 0

def execute(self, n):

Box.lock.acquire()

self.total_items += n

Box.lock.release()

def add(self):

Box.lock.acquire()

self.execute(1)

Box.lock.release()

def remove(self):

Box.lock.acquire()

self.execute(-1)

Box.lock.release()

def adder(box, items):

while items > 0:

print("adding 1 item in the box")

box.add()

time.sleep(1)

items -= 1

def remover(box, items):

while items > 0:

print("removing 1 item in the box")

box.remove()

time.sleep(1)

items -= 1

if __name__ == "__main__":

items = 5

print("putting {0} items in the box".format(items))

box = Box()

t1 = threading.Thread(target=adder, args=(box, items))

t2 = threading.Thread(target=remover, args=(box, items))

t1.start()

t2.start()

t1.join()

t2.join()

print("{0} items still remain in the box".format(box.total_items))

信號量

示例代碼如下所示

import threading

import time

import random

semaphore = threading.Semaphore(0)

def consumer():

print("Consumer is waiting.")

semaphore.acquire()

print("Consumer notify:consumed item numbers {0}".format(item))

def producer():

global item

time.sleep(10)

item = random.randint(0, 10000)

print("producer notify:produced item number {0}".format(item))

semaphore.release()

if __name__ == "__main__":

for i in range(0, 5):

t1 = threading.Thread(target=producer)

t2 = threading.Thread(target=consumer)

t1.start()

t2.start()

t1.join()

t2.join()

print("program terminated.")

信號量初始化為 0 ,然后在兩個并行線程中,通過調用 semaphore.acquire() 函數會阻塞消費者線程,直到 semaphore.release() 在生產者中被調用,這里模擬了生產者-消費者 模式來進行了測試;如果信號量的計數器到了0,就會阻塞 acquire() 方法,直到得到另一個線程的通知。如果信號量的計數器大于0,就會對這個值-1然后分配資源。

使用條件進行線程同步

解釋條件機制最好的例子還是生產者-消費者問題。在本例中,只要緩存不滿,生產者一直向緩存生產;只要緩存不空,消費者一直從緩存取出(之后銷毀)。當緩沖隊列不為空的時候,生產者將通知消費者;當緩沖隊列不滿的時候,消費者將通知生產者。

示例代碼如下所示

from threading import Thread, Condition

import time

items = []

condition = Condition()

class consumer(Thread):

def __init__(self):

Thread.__init__(self)

def consume(self):

global condition

global items

condition.acquire()

if len(items) == 0:

condition.wait()

print("Consumer notify:no item to consum")

items.pop()

print("Consumer notify: consumed 1 item")

print("Consumer notify: item to consume are:{0}".format(len(items)))

condition.notify()

condition.release()

def run(self):

for i in range(0, 20):

time.sleep(2)

self.consume()

class producer(Thread):

def __init__(self):

Thread.__init__(self)

def produce(self):

global condition

global items

condition.acquire()

if len(items) == 10:

condition.wait()

print("Producer notify:items producted are:{0}".format(len(items)))

print("Producer notify:stop the production!!")

items.append(1)

print("Producer notify:total items producted:{0}".format(len(items)))

condition.notify()

condition.release()

def run(self):

for i in range(0, 20):

time.sleep(1)

self.produce()

if __name__ == "__main__":

producer = producer()

consumer = consumer()

producer.start()

consumer.start()

producer.join()

consumer.join()

通過 condition.acquire() 來獲取鎖對象,condition.wait() 會使當前線程進入阻塞狀態,直到收到 condition.notify() 信號,同時,調用信號的通知的對象也要及時調用 condition.release() 來釋放資源;

使用事件進行線程同步

事件是線程之間用于通信的對。有的線程等待信號,有的線程發出信號。

示例代碼如下所示

import time

from threading import Thread, Event

import random

items = []

event = Event()

class consumer(Thread):

def __init__(self, items, event):

Thread.__init__(self)

self.items = items

self.event = event

def run(self):

while True:

time.sleep(2)

self.event.wait()

item = self.items.pop()

print('Consumer notify:{0} popped from list by {1}'.format(

item, self.name))

class producer(Thread):

def __init__(self, integers, event):

Thread.__init__(self)

self.items = items

self.event = event

def run(self):

global item

for i in range(100):

time.sleep(2)

item = random.randint(0, 256)

self.items.append(item)

print('Producer notify: item N° %d appended to list by %s' %

(item, self.name))

print('Producer notify: event set by %s' % self.name)

self.event.set()

print('Produce notify: event cleared by %s ' % self.name)

self.event.clear()

if __name__ == "__main__":

t1 = producer(items, event)

t2 = consumer(items, event)

t1.start()

t2.start()

t1.join()

t2.join()

使用 with 語法簡化代碼

import threading

import logging

logging.basicConfig(level=logging.DEBUG,

format='(%(threadName)-10s) %(message)s')

def threading_with(statement):

with statement:

logging.debug("%s acquired via with" % statement)

def Threading_not_with(statement):

statement.acquire()

try:

logging.debug("%s acquired directly " % statement)

finally:

statement.release()

if __name__ == "__main__":

lock = threading.Lock()

rlock = threading.RLock()

condition = threading.Condition()

mutex = threading.Semaphore(1)

threading_synchronization_list = [lock, rlock, condition, mutex]

for statement in threading_synchronization_list:

t1 = threading.Thread(target=threading_with, args=(statement,))

t2 = threading.Thread(target=Threading_not_with, args=(statement,))

t1.start()

t2.start()

t1.join()

t2.join()

使用 queue 進行線程通信

Queue 常用的方法有以下四個:

put():往 queue 中添加一個元素

get():從 queue 中刪除一個元素,并返回該元素

task_done():每次元素被處理的時候都需要調用這個方法

join():所有元素都被處理之前一直阻塞from threading import Thread, Event

from queue import Queue

import time

import random

class producer(Thread):

def __init__(self, queue):

Thread.__init__(self)

self.queue = queue

def run(self):

for i in range(10):

item = random.randint(0, 256)

self.queue.put(item)

print("Producer notify: item item N° %d appended to queue by %s" %

(item, self.name))

time.sleep(1)

class consumer(Thread):

def __init__(self, queue):

Thread.__init__(self)

self.queue = queue

def run(self):

while True:

item = self.queue.get()

print('Consumer notify : %d popped from queue by %s' %

(item, self.name))

self.queue.task_done()

if __name__ == "__main__":

queue = Queue()

t1 = producer(queue)

t2 = consumer(queue)

t3 = consumer(queue)

t4 = consumer(queue)

t1.start()

t2.start()

t3.start()

t4.start()

t1.join()

t2.join()

t3.join()

t4.join()

基于進程的并行

multiprocessing 是 Python 標準庫中的模塊,實現了共享內存機制。

異步編程

使用 concurrent.futures 模塊

該模塊具有線程池和進程池,管理并行編程任務、處理非確定性的執行流程、進程/線程同步等功能;此模塊由以下部分組成

concurrent.futures.Executor: 這是一個虛擬基類,提供了異步執行的方法。

submit(function, argument): 調度函數(可調用的對象)的執行,將 argument 作為參數傳入。

map(function, argument): 將 argument 作為參數執行函數,以 異步 的方式。

shutdown(Wait=True): 發出讓執行者釋放所有資源的信號。

concurrent.futures.Future: 其中包括函數的異步執行。Future對象是submit任務(即帶有參數的functions)到executor的實例。

示例代碼如下所示

import concurrent.futures

import time

number_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def evaluate_item(x):

result_item = count(x)

return result_item

def count(number):

for i in range(0, 1000000):

i = i + 1

return i * number

if __name__ == "__main__":

# 順序執行

start_time = time.time()

for item in number_list:

print(evaluate_item(item))

print("Sequential execution in " + str(time.time() - start_time), "seconds")

# 線程池執行

start_time_1 = time.time()

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:

futures = [executor.submit(evaluate_item, item)

for item in number_list]

for future in concurrent.futures.as_completed(futures):

print(future.result())

print("Thread pool execution in " +

str(time.time() - start_time_1), "seconds")

# 線程池執行

start_time_2 = time.time()

with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor:

futures = [executor.submit(evaluate_item, item)

for item in number_list]

for future in concurrent.futures.as_completed(futures):

print(future.result())

print("Process pool execution in " +

str(time.time() - start_time_2), "seconds")

使用 Asyncio 管理事件循環

Python 的 Asyncio 模塊提供了管理事件、協程、任務和線程的方法,以及編寫并發代碼的原語。此模塊的主要組件和概念包括:

事件循環: 在Asyncio模塊中,每一個進程都有一個事件循環。

協程: 這是子程序的泛化概念。協程可以在執行期間暫停,這樣就可以等待外部的處理(例如IO)完成之后,從之前暫停的地方恢復執行。

Futures: 定義了 Future 對象,和 concurrent.futures 模塊一樣,表示尚未完成的計算。

Tasks: 這是Asyncio的子類,用于封裝和管理并行模式下的協程。

Asyncio 提供了以下方法來管理事件循環:

loop = get_event_loop(): 得到當前上下文的事件循環。

loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再執行 callback 方法。

loop.call_soon(callback, argument): 盡可能快調用 callback, call_soon() 函數結束,主線程回到事件循環之后就會馬上調用 callback 。

loop.time(): 以float類型返回當前時間循環的內部時間。

asyncio.set_event_loop(): 為當前上下文設置事件循環。

asyncio.new_event_loop(): 根據此策略創建一個新的時間循環并返回。

loop.run_forever(): 在調用 stop() 之前將一直運行。

示例代碼如下所示

import asyncio

import datetime

import time

def fuction_1(end_time, loop):

print("function_1 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, fuction_2, end_time, loop)

else:

loop.stop()

def fuction_2(end_time, loop):

print("function_2 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, function_3, end_time, loop)

else:

loop.stop()

def function_3(end_time, loop):

print("function_3 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, fuction_1, end_time, loop)

else:

loop.stop()

def function_4(end_time, loop):

print("function_4 called")

if(loop.time() + 1.0) < end_time:

loop.call_later(1, function_4, end_time, loop)

else:

loop.stop()

loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0

loop.call_soon(fuction_1, end_loop, loop)

loop.run_forever()

loop.close()

使用 Asyncio 管理協程

示例代碼如下所示

import asyncio

import time

from random import randint

@asyncio.coroutine

def StartState():

print("Start State called n")

input_val = randint(0, 1)

time.sleep(1)

if input_val == 0:

result = yield from State2(input_val)

else:

result = yield from State1(input_val)

print("Resume of the Transition:nStart State calling" + result)

@asyncio.coroutine

def State1(transition_value):

outputVal = str("State 1 with transition value=%s n" % (transition_value))

input_val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if input_val == 0:

result = yield from State3(input_val)

else:

result = yield from State2(input_val)

@asyncio.coroutine

def State2(transition_value):

outputVal = str("State 2 with transition value= %s n" %

(transition_value))

input_Val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if (input_Val == 0):

result = yield from State1(input_Val)

else:

result = yield from State3(input_Val)

result = "State 2 calling " + result

return outputVal + str(result)

@asyncio.coroutine

def State3(transition_value):

outputVal = str("State 3 with transition value = %s n" %

(transition_value))

input_val = randint(0, 1)

time.sleep(1)

print("...Evaluating...")

if(input_val == 0):

result = yield from State1(input_val)

else:

result = yield from State2(input_val)

result = "State 3 calling " + result

return outputVal + str(result)

@asyncio.coroutine

def EndState(transition_value):

outputVal = str("End State With transition value = %s n" %

(transition_value))

print("...Stop Computation...")

return outputVal

if __name__ == "__main__":

print("Finites State Machine simulation with Asyncio Coroutine")

loop = asyncio.get_event_loop()

loop.run_until_complete(StartState())

使用 Asyncio 控制任務

示例代碼如下所示

import asyncio

@asyncio.coroutine

def factorial(number):

f = 1

for i in range(2, number + 1):

print("Asyncio.Task:Compute factorial(%s)" % (i))

yield from asyncio.sleep(1)

f *= i

print("Asyncio.Task - factorial(%s) = %s" % (number, f))

@asyncio.coroutine

def fibonacci(number):

a, b = 0, 1

for i in range(number):

print("Asyncio.Task:Complete fibonacci (%s)" % (i))

yield from asyncio.sleep(1)

a, b = b, a+b

print("Asyncio.Task - fibonaci (%s)= %s" % (number, a))

@asyncio.coroutine

def binomialCoeff(n, k):

result = 1

for i in range(1, k+1):

result = result * (n-i+1) / i

print("Asyncio.Task:Compute binomialCoeff (%s)" % (i))

yield from asyncio.sleep(1)

print("Asyncio.Task - binomialCoeff (%s,%s) = %s" % (n, k, result))

if __name__ == "__main__":

tasks = [asyncio.Task(factorial(10)), asyncio.Task(

fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

使用Asyncio和Futures

示例代碼如下所示

import asyncio

import sys

@asyncio.coroutine

def first_coroutine(future, N):

count = 0

for i in range(1, N + 1):

count = count + i

yield from asyncio.sleep(4)

future.set_result(

"first coroutine (sum of N integers) result = " + str(count))

@asyncio.coroutine

def second_coroutine(future, N):

count = 1

for i in range(2, N + 1):

count *= i

yield from asyncio.sleep(3)

future.set_result("second coroutine (factorial) result = " + str(count))

def got_result(future):

print(future.result())

if __name__ == "__main__":

N1 = 1

N2 = 1

loop = asyncio.get_event_loop()

future1 = asyncio.Future()

future2 = asyncio.Future()

tasks = [

first_coroutine(future1, N1),

second_coroutine(future2, N2)

]

future1.add_done_callback(got_result)

future2.add_done_callback(got_result)

loop.run_until_complete(asyncio.wait(tasks))

loop.close()

分布式編程

GPU 編程

相關參考

內容來源于網絡,如有侵權請聯系客服刪除

總結

以上是生活随笔為你收集整理的python并行编程语言_Python3 系列之 并行编程的全部內容,希望文章能夠幫你解決所遇到的問題。

如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。