Distributed processes :
A distributed process is a process that will Process Processes are distributed across multiple machines , Make full use of the performance of multiple machines to complete complex tasks . stay Thread and Process in , It should be preferred that Process, because Process A more stable , and ,Process It can be distributed to multiple machines , and Thread At most, it can only be distributed to multiple of the same machine CPU On .
Python Of multiprocessing Module not only supports multi process , among managers Sub modules also support the distribution of multiple processes to multiple machines . A service process can act as a dispatcher , Distribute tasks to multiple other processes , Rely on network communication . because managers The module package is very good , You don't need to know the details of network communication , It's easy to write distributed multiprocessing programs .
for instance : When doing a crawler , We often encounter such scenes , We want to capture the link address of the picture , Store the link address in Queue in , Another process is responsible for starting from Queue Read the link address to download and store it locally . Now make this process distributed , A process on a machine is responsible for grabbing Links , Processes on other machines are responsible for downloading storage , So the main problem is that Queue Exposed to the Internet , Make other machine processes accessible , Distributed process encapsulates this process , We can call this process the networking of this queue .
Creating a distributed process requires a service process and a task process :
Service process creation :
Build queue Queue, Used for inter process communication . The service process creates a task queue task_queue, Used as a The channel that passes the task to the task process ; The service process creates a result queue result_queue, As the channel for the task process to reply to the service process after completing the task . In a distributed multiprocessing environment , Must pass by Queuemanager get Queue Interface to add tasks .
Register the queue established in the first step on the network , Expose to other processes ( host ), Get the network queue after registration , It is equivalent to the image of the queue of this team .
Create a danger (Queuemanager(BaseManager)) example manager, Bind port and verify password .
Start the instance created in step 3 , That is, start Management manager, Regulatory information channels
Through the method of managing instances, access through the network can be obtained Queue object , That is, the network queue is materialized into a local queue that can be used .
Create task to " Local " In line , Automatically upload tasks to the network queue , Assigned to the task process for processing .
Be careful : I'm based here window Operating system ,linux The system will be different
# coding:utf-8 # taskManager.py for win import Queue from multiprocessing.managers import BaseManage from multiprocessing import freeze_support
The number of tasks
task_num = 10
Define the send receive queue
task_queue = Queue.Queue(task_num) result_queue = Queue.Queue(task_num) def get_task(): return task_queue def get_result(): return result_queue
Create a similar QueueManage
class QueueManager(BaseManager): pass def win_run(): # windows The lower bound calling interface cannot be used lambda, So you can only define the function first and then bind it QueueManager.register('get_task_queue', callable=get_task) QueueManager.register('get_result_queue', callable=get_result) # Bind the port and set the authentication password ,windows Need to fill in IP Address ,Linux No filling below , Default to local manager = QueueManager(address=('127.0.0.1', 4000), authkey='qty')
start-up
manager.start()
Get task queue and result queue through network
task = manager.get_task_queue() result = manager.get_result_queue() try: # Add tasks for i in range(10): print 'put task %s...' % i task.put(i) print 'try get result...' for i in range(10): print 'result is %s' % result.get(timeout=10)
except: print 'manage error' finally:
Be sure to close , Otherwise, an error that the management is not closed will be reported
manager.shutdown() print 'master exit!'
if __name__ == '__main__':
windows There may be problems with multiple processes , Add this sentence to ease
freeze_support() win_run()
Task progress
Use QueueManager Register to get Queue Method name of , Task processes can only be obtained on the network by name Queue
Connect to the server , The port and authentication password should be completely consistent with the service process
Get it from the Internet Queue, Localize
from Task Queue get task , And put the results result queue
coding:utf-8
import time from multiprocessing.managers import BaseManage
Create a similar QueueManager:
class QueueManager(BaseManager): pass
First step : Use QueueManager Register to get Queue Method name of
QueueManager.register('get_task_queue') QueueManager.register('get_result_queue')
The second step : Connect to server
server_addr = '127.0.0.1' print "Connect to server %s" % server_add
Note that the port and password of the authentication process are completely consistent
m = QueueManager(address=(server_addr, 4000), authkey='qty')
Connect from the network
m.connect()
The third step : obtain Queue The object of
task = m.get_task_queue() result = m.get_result_queue()
Step four : from task Queue get task , And write the results in result queue :
while not task.empty(): index = task.get(True, timeout=10) print 'run task download %s' % str(index) result.put('%s---->success ' % str(index))
End of processing
print 'worker exit.'
Execution results
First run : The service process gets results
put task 0... put task 1... put task 2... put task 3... put task 4... put task 5... put task 6... put task 7... put task 8... put task 9... try get result...
Then run immediately : The task process gets results , Prevent the process from not getting results after it is completed , This must be done immediately
Connect to server 127.0.0.1 run task download 0 run task download 1 run task download 2 run task download 3 run task download 4 run task download 5 run task download 6 run task download 7 run task download 8 run task download 9 worker exit.
Finally, look back at the results of the service process window
put task 0... put task 1... put task 2... put task 3... put task 4... put task 5... put task 6... put task 7... put task 8... put task 9... try get result... result is 0---->success result is 1---->success result is 2---->success result is 3---->success result is 4---->success result is 5---->success result is 6---->success result is 7---->success result is 8---->success result is 9---->success master exit!
This is a simple but truly distributed computing , Change the code a little bit , Start multiple worker, Just distribute the task to several or even dozens of machines , Implement large-scale distributed crawlers