–
import time import asyncio import requests import threading import tornado.ioloop import tornado.web from tornado import httpclient #yum -y install zlib zlib-devel libffi-devel openssl openssl-devel readline-devel readline patch gcc gcc-c++ libcurl-devel #./configure --prefix=/usr/local/Python-3.7.3 --enable-shared --enable-optimizations #make && make install #vim /etc/ld.so.conf.d/python-3.7.3.conf #/usr/local/Python-3.7.3/lib/ #ldconfig #python3.7 -m pip install --no-cache-dir --compile --ignore-installed --install-option="--with-nss" pycurl #python3.7 -m pip install tornado requests httpclient.AsyncHTTPClient.configure( "tornado.curl_httpclient.CurlAsyncHTTPClient" ) class AsyncioTest(object): def __init__(self, name, id, delay): self.name = name self.id = id self.delay = delay async def callname(self, url): while True: await asyncio.sleep(self.delay) print(f"the name is {self.name}, and id is {self.id}") http = httpclient.AsyncHTTPClient() try: response = await http.fetch(url) print(type(response.headers)) except Exception as e: print(f"ERROR: {e}") async def main(): global now_loop now_loop = asyncio.get_running_loop() test = AsyncioTest('xx', 1, 1) test2 = AsyncioTest('yy', 2, 2) test3 = AsyncioTest('zz', 3, 3) await asyncio.gather( test.callname('http://www.baidu.com'), test2.callname('http://www.qq.com'), test3.callname('http://www.jd.com') ) t = threading.Thread(target=asyncio.run, args=(main(),)) t.start() time.sleep(0.1) print(now_loop) task = AsyncioTest('xxx', 32, 1) future = asyncio.run_coroutine_threadsafe(task.callname('http://www.baidu.com'), now_loop) while True: time.sleep(1)
–
–
–
评论前必须登录!
注册