Browse Source

use ThreadPoolExecutor

j 2 months ago
parent
commit
ff7cee2be1
2 changed files with 50 additions and 83 deletions
  1. 12
    40
      oml/item/icons.py
  2. 38
    43
      oml/oxtornado.py

+ 12
- 40
oml/item/icons.py View File

@@ -8,8 +8,10 @@ import tornado.concurrent
8 8
 import tornado.gen
9 9
 import tornado.ioloop
10 10
 import tornado.web
11
+from concurrent.futures import ThreadPoolExecutor
12
+from tornado.concurrent import run_on_executor
13
+
11 14
 
12
-from oxtornado import run_async
13 15
 from settings import icons_db_path, static_path
14 16
 from utils import resize_image, is_svg
15 17
 import db
@@ -18,6 +20,9 @@ import db
18 20
 import logging
19 21
 logger = logging.getLogger(__name__)
20 22
 
23
+MAX_WORKERS = 4
24
+
25
+
21 26
 
22 27
 class Icons(dict):
23 28
     def __init__(self, db):
@@ -174,53 +179,20 @@ def get_icon_sync(id, type_, size):
174 179
         data = ''
175 180
     return data
176 181
 
177
-@run_async
178
-def get_icon(id, type_, size, callback):
179
-    callback(get_icon_sync(id, type_, size))
180
-
181 182
 def clear_default_cover_cache():
182 183
     icons.clear('default:cover:')
183 184
 
184
-@run_async
185
-def get_icon_app(id, type_, size, callback):
186
-    with db.session():
187
-        from item.models import Item
188
-        item = Item.get(id)
189
-        if not item:
190
-            data = ''
191
-        else:
192
-            if type_ == 'cover' and not item.meta.get('cover'):
193
-                type_ = 'preview'
194
-            if type_ == 'preview' and not item.files.count():
195
-                type_ = 'cover'
196
-            if size:
197
-                skey = '%s:%s:%s' % (type_, id, size)
198
-            key = '%s:%s' % (type_, id)
199
-            data = None
200
-            if size:
201
-                data = icons[skey]
202
-                if data:
203
-                    size = None
204
-            if not data:
205
-                data = icons[key]
206
-            if not data:
207
-                data = icons.default_cover()
208
-                size = None
209
-            if size:
210
-                try:
211
-                    data = resize_image(data, size=size)
212
-                    icons[skey] = data
213
-                except:
214
-                    pass
215
-            data = bytes(data) or ''
216
-    callback(data)
217 185
 
218 186
 class IconHandler(tornado.web.RequestHandler):
187
+    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
219 188
 
220 189
     def initialize(self):
221 190
         pass
222 191
 
223
-    @tornado.web.asynchronous
192
+    @run_on_executor
193
+    def get_icon(self, id, type_, size):
194
+        return get_icon_sync(id, type_, size)
195
+
224 196
     @tornado.gen.coroutine
225 197
     def get(self, id, type_, size=None):
226 198
 
@@ -232,7 +204,7 @@ class IconHandler(tornado.web.RequestHandler):
232 204
 
233 205
         self.set_header('Content-Type', 'image/jpeg')
234 206
 
235
-        response = yield tornado.gen.Task(get_icon, id, type_, size)
207
+        response = yield self.get_icon(id, type_, size)
236 208
         if not response:
237 209
             self.set_status(404)
238 210
             return

+ 38
- 43
oml/oxtornado.py View File

@@ -12,13 +12,15 @@ import tornado.ioloop
12 12
 import tornado.web
13 13
 import tornado.gen
14 14
 import tornado.concurrent
15
-from threading import Thread
16
-from functools import wraps
15
+from concurrent.futures import ThreadPoolExecutor
16
+from tornado.concurrent import run_on_executor
17
+
17 18
 
18 19
 import logging
19 20
 logger = logging.getLogger(__name__)
20 21
 
21 22
 
23
+MAX_WORKERS = 4
22 24
 
23 25
 def json_response(data=None, status=200, text='ok'):
24 26
     if not data:
@@ -37,15 +39,6 @@ def json_dumps(obj):
37 39
     indent = 2
38 40
     return json.dumps(obj, indent=indent, default=_to_json, ensure_ascii=False).encode()
39 41
 
40
-def run_async(func):
41
-    @wraps(func)
42
-    def async_func(*args, **kwargs):
43
-        func_hl = Thread(target=func, args=args, kwargs=kwargs)
44
-        func_hl.start()
45
-        return func_hl
46
-
47
-    return async_func
48
-
49 42
 def trim(docstring):
50 43
     if not docstring:
51 44
         return ''
@@ -75,44 +68,46 @@ def trim(docstring):
75 68
 def defaultcontext():
76 69
     yield
77 70
 
78
-@run_async
79
-def api_task(context, request, callback):
80
-    import settings
81
-    if context == None:
82
-        context = defaultcontext
83
-    action = request.arguments.get('action', [None])[0].decode('utf-8')
84
-    data = request.arguments.get('data', [b'{}'])[0]
85
-    data = json.loads(data.decode('utf-8')) if data else {}
86
-    if not action:
87
-        methods = list(actions.keys())
88
-        api = []
89
-        for f in sorted(methods):
90
-            api.append({'name': f,
91
-                        'doc': actions.doc(f).replace('\n', '<br>\n')})
92
-        response = json_response(api)
93
-    else:
94
-        if settings.DEBUG_API:
95
-            logger.debug('API %s %s', action, data)
96
-        f = actions.get(action)
97
-        if f:
98
-            with context():
99
-                try:
100
-                    response = f(data)
101
-                except:
102
-                    logger.debug('FAILED %s %s', action, data, exc_info=True)
103
-                    response = json_response(status=500, text='%s failed' % action)
104
-        else:
105
-            response = json_response(status=400, text='Unknown action %s' % action)
106
-    callback(response)
107
-
108 71
 class ApiHandler(tornado.web.RequestHandler):
72
+    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)
73
+
109 74
     def initialize(self, context=None):
110 75
         self._context = context
111 76
 
112 77
     def get(self):
113 78
         self.write('use POST')
114 79
 
115
-    @tornado.web.asynchronous
80
+    @run_on_executor
81
+    def api_task(self, request):
82
+        import settings
83
+        context = self._context
84
+        if context == None:
85
+            context = defaultcontext
86
+        action = request.arguments.get('action', [None])[0].decode('utf-8')
87
+        data = request.arguments.get('data', [b'{}'])[0]
88
+        data = json.loads(data.decode('utf-8')) if data else {}
89
+        if not action:
90
+            methods = list(actions.keys())
91
+            api = []
92
+            for f in sorted(methods):
93
+                api.append({'name': f,
94
+                            'doc': actions.doc(f).replace('\n', '<br>\n')})
95
+            response = json_response(api)
96
+        else:
97
+            if settings.DEBUG_API:
98
+                logger.debug('API %s %s', action, data)
99
+            f = actions.get(action)
100
+            if f:
101
+                with context():
102
+                    try:
103
+                        response = f(data)
104
+                    except:
105
+                        logger.debug('FAILED %s %s', action, data, exc_info=True)
106
+                        response = json_response(status=500, text='%s failed' % action)
107
+            else:
108
+                response = json_response(status=400, text='Unknown action %s' % action)
109
+        return response
110
+
116 111
     @tornado.gen.coroutine
117 112
     def post(self):
118 113
         if 'origin' in self.request.headers and self.request.host not in self.request.headers['origin']:
@@ -121,7 +116,7 @@ class ApiHandler(tornado.web.RequestHandler):
121 116
             self.write('')
122 117
             return
123 118
 
124
-        response = yield tornado.gen.Task(api_task, self._context, self.request)
119
+        response = yield self.api_task(self.request)
125 120
         if not 'status' in response:
126 121
             response = json_response(response)
127 122
         response = json_dumps(response)