路漫漫其修远兮
吾将上下而求索

python学习:异步-协程-多循环-任务添加示例

import time
import asyncio
import requests
import threading
import tornado.ioloop
import tornado.web
from tornado import httpclient

#yum -y install zlib zlib-devel libffi-devel openssl openssl-devel readline-devel readline patch gcc gcc-c++ libcurl-devel
#./configure --prefix=/usr/local/Python-3.7.3 --enable-shared --enable-optimizations
#make && make install
#vim /etc/ld.so.conf.d/python-3.7.3.conf
#/usr/local/Python-3.7.3/lib/
#ldconfig

#python3.7 -m pip install --no-cache-dir --compile --ignore-installed --install-option="--with-nss" pycurl
#python3.7 -m pip install tornado requests

httpclient.AsyncHTTPClient.configure(
    "tornado.curl_httpclient.CurlAsyncHTTPClient"
)


class AsyncioTest(object):

    def __init__(self, name, id, delay):
        self.name = name
        self.id = id
        self.delay = delay

    async def callname(self, url):
        while True:
            await asyncio.sleep(self.delay)
            print(f"the name is {self.name}, and id is {self.id}")
            http = httpclient.AsyncHTTPClient()
            try:
                response = await http.fetch(url)
                print(type(response.headers))
            except Exception as e:
                print(f"ERROR: {e}")


async def main():
    global now_loop
    now_loop = asyncio.get_running_loop()
    test = AsyncioTest('xx', 1, 1)
    test2 = AsyncioTest('yy', 2, 2)
    test3 = AsyncioTest('zz', 3, 3)
    await asyncio.gather(
    test.callname('http://www.baidu.com'),
    test2.callname('http://www.qq.com'),
    test3.callname('http://www.jd.com')
    )
t = threading.Thread(target=asyncio.run, args=(main(),))
t.start()
time.sleep(0.1)
print(now_loop)
task = AsyncioTest('xxx', 32, 1)
future = asyncio.run_coroutine_threadsafe(task.callname('http://www.baidu.com'), now_loop)

while True:
    time.sleep(1)

未经允许不得转载:江哥架构师笔记 » python学习:异步-协程-多循环-任务添加示例

分享到:更多 ()

评论 抢沙发

评论前必须登录!