How to coordinate the task chain in the celery-rabbitMQ framework to reduce the memory usage? -
1、 design tool hand traffic report of lte-network.i follow git-project :https://github.com/esperdyne/celery-message-processing.
the code work, yet memory highly occupied , therefore hope coordinate task chain of celery-rabbitmq.
2、 use- case model
in example have 2 use cases, let’s see use-case diagram below.
figure 4-1 use case model form above diagram, can see wish have timer , 2 jobs: 1, fetch traffic report files ftp servers, parse file ,pick information field wish , , storage files in database. 2, fetch traffic report files ftp servers, parse file ,pick information field wish. calculate total traffic of each cell, filer cell traffic volume bigger 100 m, , copy records database. let’s have @ use-case description of 2 use case. 1.2.1 storage traffic report 1、 timer visit ftp server every week , find out new arriving files. 2、it pick file, convert file intermediate stream in memory. 3, pick information necessary traffic dimension , ready intermediate stream. 4、the timer connect database , , copy intermediate stream data base. 5, when files handled, finish job.
1.2.2 filter high traffic cell 1、 when timer have prepare intermediate stream wish. forward stream filter. 2, inside filter, check each of cell, calculate traffic volume of cell, , find out cell traffic bigger 100 m. while records checked, timer have new stream. 4、the timer connect database , , copy intermediate stream data base.
2、 code
2.1 task queue
@app.task(base=pmtask, queue="parse") def parse(filename): """parse traffic report file. return list""" # call method in base task , return result return parse.parse_csv_file(filename) @app.task(base=pmtask, queue="pm_deploy", ignore_result=true) def deploy_pm(pm_list): """deploys message dataset list postgres-sql database table""" # call method in base task deploy_pm.database_pm_insert(pm_list) @app.task(base=pmtask, queue="filter_deploy", ignore_result=true) def deploy_filter(pm_list): """deploys data set list filter instance""" # call method in base task deploy_filter.database_filter_insert(pm_list)
2.2 def parse(filename)
it fetch traffic report file , transfer intermediate file , , ready list of required information field.
2.3 deploy_pm(pm_list)
def database_pm_insert(self, pm_list): """insert pm_list postgres database""" if self._pm_table none: self._init_database() ins = self._pm_table.insert(values=pm_list) ins.execute()
2.4 def deploy_filter(pm_list)
def database_filter_insert(self, pm_list): """insert pm_filter_list postgres database""" if self._filter_table none: self._init_database() filter_list = [d d in pm_list if d['upoctul'] + d['upoctdl'] > 100] ins = self._filter_table.insert(values=filter_list) ins.execute()
3、the problem meet while test code, meet problem, if traffic report file size big, there many records in file, capacity in object pm_list big, memory in high load , fail.
i guess problem this: 1、 worker parser works fast, , worker deploy_pm , deploy_filter works slow, there many instances of deploy_pm , deploy_filter remain in memory. why exceed limit of memory.
i not know understanding right or no, hope give me advice on schedule of task view.
4、in test case
i use 8 files, , machine mac book 16g memories. while each file have 170000 records.
the error log can seen here:
Comments
Post a Comment