路漫漫其修远兮
吾将上下而求索

python学习:ansible示例

[root@master-20 ~/ansible_test]#cat playbook.py 
import logging
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase
from ansible.vars.manager import VariableManager
from ansible import context
from ansible.module_utils.common.collections import ImmutableDict

logger = logging.getLogger(__name__)
logger.setLevel(level = logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(name)s] [%(levelname)s] %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)


# Create a callback object so we can capture the output
class ResultsCollector(CallbackBase):

    def __init__(self, *args, **kwargs):
        super(ResultsCollector, self).__init__(*args, **kwargs)
        self.host_ok = {}
        self.host_unreachable = {}
        self.host_failed = {}

    def v2_runner_on_unreachable(self, result):
        self.host_unreachable[result._host.get_name()] = result
        print("unreachable")
        print(result._result)

    def v2_runner_on_ok(self, result, *args, **kwargs):
        self.host_ok[result._host.get_name()] = result
        print("successful")
        print(result._result['stdout'])

    def v2_runner_on_failed(self, result, *args, **kwargs):
        self.host_failed[result._host.get_name()] = result
        print("failed")
        print(result._result['stderr'])

class AnsiblePlaybook(object):
    
    def __init__(self, host_list, module, cmd=None, path=None, forks=5, callback=None):
        self.forks = forks
        self.host_list = host_list
        self.module = module
        if path:
            self.src = path[0]
            self.dest = path[1]
        self.cmd = cmd
        self.callback = callback

    def start(self):
        context.CLIARGS = ImmutableDict(connection='smart', forks=self.forks, become=None,
                                        become_method=None, become_user=None, 
                                        check=False, diff=False)

        loader = DataLoader()
        passwords = dict()
        inventory = InventoryManager(loader=loader, sources='/etc/ansible/hosts')
        variable_manager = VariableManager(loader=loader, inventory=inventory)

        if self.module == 'copy':
            tasks=[dict(action=dict(module=self.module, args=dict(src=self.src, dest=self.dest)))]
        elif self.module == 'shell':
            tasks=[dict(action=dict(module=self.module, args=dict(cmd=self.cmd)))]
        else:
            logger.error("module not found")
            return False
            
            
        play_source = dict(
            name="Ansible Play",
            hosts=self.host_list,
            gather_facts='no',
            tasks=tasks
        )
        play = Play().load(play_source, variable_manager=variable_manager, loader=loader)

        tqm = None
        try:
            tqm = TaskQueueManager(
                inventory=inventory,
                variable_manager=variable_manager,
                loader=loader,
                passwords=passwords,
                stdout_callback=self.callback,
            )
            result = tqm.run(play)
        finally:
            if tqm is not None:
                tqm.cleanup()
            if loader:
                loader.cleanup_all_tmp_files()

if __name__ == '__main__':
    paly = AnsiblePlaybook(['127.0.0.1', '192.168.170.21'], 'shell', 'ls', callback=ResultsCollector())
    #paly = AnsiblePlaybook(['10.216.4.14', '10.216.4.15'], 'copy', path = ['/opt/ansible_test/1.py', '/opt'], callback=ResultsCollector())
    paly.start()
    
    
需要添加配置
[root@master-20 ~/ansible_test]#cat /etc/ansible/hosts 
[user]
192.168.170.20
192.168.170.22    

说明:
使用python3.6
使用ansble==2.8.2版本
可能会遇到问题,可以查看具体的报错信息:print(result._result),修改下库的源码






====================================
另外可能有用示例

#!/usr/bin/env python
#coding=utf-8
import os
import time
import json
import shutil
import logging
import pexpect
import datetime
import ansible.constants as C
from collections import namedtuple
from ansible.playbook.play import Play
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.plugins.callback import CallbackBase
from ansible.inventory.manager import InventoryManager
from ansible.executor.task_queue_manager import TaskQueueManager

logger = logging.getLogger(__name__)
'''
logger.setLevel(level = logging.INFO)
formatter = logging.Formatter('%(asctime)s [%(name)s] [%(levelname)s] %(message)s')
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(formatter)
logger.addHandler(console)
'''

# ----> Callback
class DefaultCallback(CallbackBase):

    def __init__(self, mysql):

        super(ResultCallback, self).__init__()
        self.mysql = mysql

    def v2_runner_on_ok(self, result, **kwargs):
        host = result._host
        logger.info('successful,%s' %host)
        #print(json.dumps({host.name: result._result['stdout_lines']}, indent=4))
    
    def v2_runner_on_failed(self, result, ignore_errors=False):
        host = result._host
        logger.info('failed,%s' %host)
    
    def v2_runner_on_unreachable(self, result):
        host = result._host
        logger.info('unreachable,%s' %host)
    

class ResultCallback(CallbackBase):

    def __init__(self, mysql, task_info):

        super(ResultCallback, self).__init__()
        self.mysql = mysql
        self.task_info = task_info

    def v2_runner_on_ok(self, result, **kwargs):
        task_id = self.task_info['task_id']
        finished_date = datetime.datetime.now().strftime('%Y-%m-%d %H:%m:%S')
        host = result._host.__str__()
        #logger.info('successful,%s:%s' %(host, result._result))
        for i in self.task_info['host_dic']:
            if i['ip'] == host:
        #res = json.dumps(result._result['stdout_lines'])
        #logger.info('successful,%s:%s' %(host, res))
                self.mysql.insert_one("insert into tb_asb_task_info (host_id, task_id, finished_date, run_stat)value(%s, %s, %s, 1)", [i['hid'], task_id, finished_date])
        logger.info('successful,%s' %host)

    def v2_runner_on_failed(self, result, ignore_errors=False):
        host = result._host
        logger.error('failed,%s:%s' %(host, result._result['msg']))

    def v2_runner_on_unreachable(self, result):
        host = result._host
        logger.info('unreachable,%s' %host)


# ----> Start Ansible API Playbook
class RunPlaybook(object):

    def __init__(self, task_info, module, args, mysql, poll=1, async=60, cmd_callback=DefaultCallback, forks=10):
        hosts = []
        self.task_info = task_info
        for i in self.task_info['host_dic']: 
            hosts.append(i['ip'])
        hosts = ",".join(hosts)
        self.hosts = hosts
        self.module = module
        self.args = args
        self.mysql = mysql
        self.poll = poll
        self.async = async
        self.cmd_callback = cmd_callback(self.mysql, self.task_info)
        self.forks = forks

    def start(self):
        Options = namedtuple('Options', ['connection', 'module_path', 'forks', 'become', 'become_method', 'become_user', 'check', 'diff'])
        options = Options(connection='smart', module_path=None, forks=self.forks, become=None, become_method=None, become_user=None, check=False, diff=False)
        loader = DataLoader()
        results_callback = self.cmd_callback
        inventory = InventoryManager(loader=loader, sources=self.hosts,)
        variable_manager = VariableManager(loader=loader, inventory=inventory)
        play_source =  dict(
                hosts = self.hosts, 
                tasks = [
                    dict(action=dict(module=self.module, args=self.args), register='result', poll=self.poll, async=self.async,),
                ])
        play = Play().load(play_source, variable_manager=variable_manager, loader=loader)
        tqm = None
        try:
            tqm = TaskQueueManager(
                      inventory=inventory,
                      variable_manager=variable_manager,
                      loader=loader,
                      options=options,
                      passwords=None,
                      stdout_callback=results_callback,
                  )
            logger.info('------Start Run Palybook------')
            result = tqm.run(play)
            self.mysql.close()
            logger.info('------End Run Palybook------')
        finally:
            if tqm is not None:
                tqm.cleanup()
            shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

# ----> Send " id_rsa.pub " Key
class PublicKey(object):

    def __init__(self, user, passwd, host, port=22, keyfile='/root/.ssh/id_rsa.pub'):
        self.user = user
        self.passwd = passwd
        self.host = host
        self.keyfile = keyfile
        self.port = port
    def send(self):
        if not os.path.isfile(self.keyfile): 
            key = pexpect.spawn('ssh-keygen -t rsa')
            if not key.expect('id_rsa'):
                key.send('\n')
                time.sleep(2)
            if not key.expect('passphrase'):
                key.send('\n')
                time.sleep(2)
            if not key.expect('again:'):
                key.send('\n')
                time.sleep(2)
            logger.info('--------Key create successfull--------')
        else: 
            logger.info('--------key Exist --------')
            pass
        for hosts in self.host:
            child = pexpect.spawn('ssh-copy-id -i %s -p %s %s@%s' %(self.keyfile, self.port, self.user, hosts))
            try:
                ssh = child.expect(['password:', 'continue connecting (yes/no)?'])
                if ssh == 0 : 
                    child.sendline(self.passwd)
                    time.sleep(2)
                    logger.info('%s:%s Send Key Successfull..ok' %(hosts,self.user))
                if ssh == 1:
                    child.sendline('yes\n')
                    child.expect('password: ')
                    child.sendline(self.passwd)
                    time.sleep(2)
                    logger.info('%s:%s Send Key Successfull..ok' %(hosts,self.user))
            except pexpect.EOF,e:
                e = list(e.args)[0]
                for i in e.split("\n"):
                    if 'before ' in i:
                        #logger.info('%s %s'%(hosts, i))
                        logger.info('%s %s'%(hosts, ':'.join(i.split(':')[2:]).split('.')[0]))
            except Exception, e:
                logger.error('PexpectException:%s' %str(e))


if __name__ == '__main__':
    task_dic = { 'host_dic': ({"hid": 1, 'ip': '192.168.1.34'}, {'hid': 2, 'ip': '192.168.1.35'}), "task_id": '1'}
    playbook = RunPlaybook(task_dic, 'command', 'ifconfig')
    playbook.start()
    #lst = []
    #for i in range(34,38):
    #    lst.append('192.168.1.%s' %i)
    #playbook = RunPlaybook(lst, 'copy', 'src=/etc/ansible/playbook/test2.yml dest=/root/',0 ,0)
    #playbook.start()
    #run = PublicKey('root' ,'password', lst)
    #run.send()

未经允许不得转载:江哥架构师笔记 » python学习:ansible示例

分享到:更多 ()

评论 抢沙发

评论前必须登录!