You can run the Ansible Playbook in Python by hand

article directory

    • ansible playbook call
      • find command path
      • to find the source code and analyze
      • simplify call

      • call after the interaction of the
        • ansible run analysis
          • cli. The run ()
          • PlaybookExecutor. The run ()
          • TaskQueueManager. The run ()
          • strategy. The run ()
          • TaskQueueManager. Send_callback ()
      • final code

    about ansible, I will not talk about science, in a word, it is necessary to do the operation and maintenance of the distributed system, to achieve the functions of batch system configuration, batch program deployment, batch run commands and so on. Ansible is a big killer, it can make you get twice the result with half the effort.

    , however, as a cli tool, its usage scenario is still limited by cli, unable to achieve deeper interaction and logical control in the running process. Ansible itself is done in Python, so it is actually seamlessly linked to python’s script control and can be used directly in Python. But there’s very little of this in the documentation on the website itself, and very little on the Internet, so this post is an attempt to provide some guidance.

    first of all, I’m not a python Daniel, can’t directly from the code of the line to see the entire ansible architecture and code, and official website said ansible code has been refactoring (found in the Internet from a handful of cases have proved that, the python call ansible example, in the new version of the ansbile can hardly run a), as a result, what we need is a way of thinking: from the CLI to start with

    ansible playbook call

    Let’s take ansible

    as an example, and show you how we can start from CLI and find a way to call ansible

    in python, step by step

    find the command path

    $ which ansible-playbook 

    find the source code and analyze

    use the above command to find ansible-playbook. Open the file (don’t read the contents of the file) :

    # (c) 2012, Michael DeHaan <[email protected]>
    # This file is part of Ansible
    # Ansible is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    # Ansible is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # GNU General Public License for more details.
    # You should have received a copy of the GNU General Public License
    # along with Ansible.  If not, see <>.
    from __future__ import (absolute_import, division, print_function)
    __metaclass__ = type
    __requires__ = ['ansible']
    import os
    import shutil
    import sys
    import traceback
    from ansible import context
    from ansible.errors import AnsibleError, AnsibleOptionsError, AnsibleParserError
    from ansible.module_utils._text import to_text
    # Used for determining if the system is running a new enough python version
    # and should only restrict on our documented minimum versions
    _PY3_MIN = sys.version_info[:2] >= (3, 5)
    _PY2_MIN = (2, 6) <= sys.version_info[:2] < (3,)
    _PY_MIN = _PY3_MIN or _PY2_MIN
    if not _PY_MIN:
        raise SystemExit('ERROR: Ansible requires a minimum of Python2 version 2.6 or Python3 version 3.5. Current version: %s' % ''.join(sys.version.splitlines()))
    class LastResort(object):
        def display(self, msg, log_only=None):
            print(msg, file=sys.stderr)
        def error(self, msg, wrap_text=None):
            print(msg, file=sys.stderr)
    if __name__ == '__main__':
        display = LastResort()
        try:  # bad ANSIBLE_CONFIG or config options can force ugly stacktrace
            import ansible.constants as C
            from ansible.utils.display import Display
        except AnsibleOptionsError as e:
            display.error(to_text(e), wrap_text=False)
        cli = None
        me = os.path.basename(sys.argv[0])
            display = Display()
            display.debug("starting run")
            sub = None
            target = me.split('-')
            if target[-1][0].isdigit():
                # Remove any version or python version info as downstreams
                # sometimes add that
                target = target[:-1]
            if len(target) > 1:
                sub = target[1]
                myclass = "%sCLI" % sub.capitalize()
            elif target[0] == 'ansible':
                sub = 'adhoc'
                myclass = 'AdHocCLI'
                raise AnsibleError("Unknown Ansible alias: %s" % me)
                mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass)
            except ImportError as e:
                # ImportError members have changed in py3
                if 'msg' in dir(e):
                    msg = e.msg
                    msg = e.message
                if msg.endswith(' %s' % sub):
                    raise AnsibleError("Ansible sub-program not implemented: %s" % me)
                args = [to_text(a, errors='surrogate_or_strict') for a in sys.argv]
            except UnicodeError:
                display.error('Command line args are not in utf-8, unable to continue.  Ansible currently only understands utf-8')
                display.display(u"The full traceback was:\n\n%s" % to_text(traceback.format_exc()))
                exit_code = 6
                cli = mycli(args)
                exit_code =
        except AnsibleOptionsError as e:
            display.error(to_text(e), wrap_text=False)
            exit_code = 5
        except AnsibleParserError as e:
            display.error(to_text(e), wrap_text=False)
            exit_code = 4
    # TQM takes care of these, but leaving comment to reserve the exit codes
    #    except AnsibleHostUnreachable as e:
    #        display.error(str(e))
    #        exit_code = 3
    #    except AnsibleHostFailed as e:
    #        display.error(str(e))
    #        exit_code = 2
        except AnsibleError as e:
            display.error(to_text(e), wrap_text=False)
            exit_code = 1
        except KeyboardInterrupt:
            display.error("User interrupted execution")
            exit_code = 99
        except Exception as e:
            if C.DEFAULT_DEBUG:
                # Show raw stacktraces in debug mode, It also allow pdb to
                # enter post mortem mode.
            have_cli_options = bool(context.CLIARGS)
            display.error("Unexpected Exception, this is probably a bug: %s" % to_text(e), wrap_text=False)
            if not have_cli_options or have_cli_options and context.CLIARGS['verbosity'] > 2:
                log_only = False
                if hasattr(e, 'orig_exc'):
                    display.vvv('\nexception type: %s' % to_text(type(e.orig_exc)))
                    why = to_text(e.orig_exc)
                    if to_text(e) != why:
                        display.vvv('\noriginal msg: %s' % why)
                display.display("to see the full traceback, use -vvv")
                log_only = True
            display.display(u"the full traceback was:\n\n%s" % to_text(traceback.format_exc()), log_only=log_only)
            exit_code = 250
            # Remove ansible tmpdir
            shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)

    as you can see, most of the content is exception handling, skip it and find the most important sentence:

            if len(target) > 1:
                sub = target[1]
                myclass = "%sCLI" % sub.capitalize()
            elif target[0] == 'ansible':
                sub = 'adhoc'
                myclass = 'AdHocCLI'
                raise AnsibleError("Unknown Ansible alias: %s" % me)
                mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass)
                cli = mycli(args)
                exit_code =

    through these lines of code, we can locate the package ansible. Cli directory. A variety of CLI is supported below :

    ├── arguments
    │   ├──
    │   ├── __pycache__
    │   │   ├── __init__.cpython-36.pyc
    │   │   └── optparse_helpers.cpython-36.pyc
    │   └──

    and the way to call it is also very simple, just need to figure out what args is, you can simply add print(args).

    simplifies calling

    finally the above code can be simplified to the following example:

    from ansible.cli.playbook import PlaybookCLI
    mycli = PlaybookCLI
    cli = mycli([" ",'-i', 'hosts.uat', 'kibana_deploy_plugin.yml'])
    exit_code =

    note that the parameter is ['-i', 'hosts. Uat ', 'kibana_deploy_plugin.yml']. The format is the same as we run ansible playbook, except that is required to provide as an array.

    run it, and here’s the result:

    PLAY [Deploy kibana optimize] **************************************************
    TASK [Gathering Facts] *********************************************************
    fatal: [BI-LASS-Kibana_10.60.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out", "unreachable": true}
    fatal: [BI-LASS-Kibana_10.50.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.50.x.x port 22: Operation timed out", "unreachable": true}
    PLAY RECAP *********************************************************************
    BI-LASS-Kibana_10.60.x.x : ok=0    changed=0    unreachable=1    failed=0    skipped=0    rescued=0    ignored=0   
    BI-LASS-Kibana_10.50.x.x : ok=0    changed=0    unreachable=1    failed=0    skipped=0    rescued=0    ignored=0   
    Process finished with exit code 0

    , we can see that the script is working, so at this point, we can call ansible playbook in python.

    interaction after the call

    is not enough, we need interaction, we need to get the results of task running, and do additional analysis and logical processing based on the results, so we need to study the code in more depth.

    ansible operating analysis

    first take a look at function:

       def run(self):
            super(PlaybookCLI, self).run()
            # Note: slightly wrong, this is written so that implicit localhost
            # manages passwords
            sshpass = None
            becomepass = None
            passwords = {}
            # initial error check, to make sure all specified playbooks are accessible
            # before we start running anything through the playbook executor
            b_playbook_dirs = []
            for playbook in context.CLIARGS['args']:
                if not os.path.exists(playbook):
                    raise AnsibleError("the playbook: %s could not be found" % playbook)
                if not (os.path.isfile(playbook) or stat.S_ISFIFO(os.stat(playbook).st_mode)):
                    raise AnsibleError("the playbook: %s does not appear to be a file" % playbook)
                b_playbook_dir = os.path.dirname(os.path.abspath(to_bytes(playbook, errors='surrogate_or_strict')))
                # load plugins from all playbooks in case they add callbacks/inventory/etc
            # don't deal with privilege escalation or passwords when we don't need to
            if not (context.CLIARGS['listhosts'] or context.CLIARGS['listtasks'] or
                    context.CLIARGS['listtags'] or context.CLIARGS['syntax']):
                (sshpass, becomepass) = self.ask_passwords()
                passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
            # create base objects
            loader, inventory, variable_manager = self._play_prereqs()
            # (which is not returned in list_hosts()) is taken into account for
            # warning if inventory is empty.  But it can't be taken into account for
            # checking if limit doesn't match any hosts.  Instead we don't worry about
            # limit if only implicit localhost was in inventory to start with.
            # Fix this when we rewrite inventory by making localhost a real host (and thus show up in list_hosts())
            CLI.get_host_list(inventory, context.CLIARGS['subset'])
            # flush fact cache if requested
            if context.CLIARGS['flush_cache']:
                self._flush_cache(inventory, variable_manager)
            # create the playbook executor, which manages running the plays via a task queue manager
            pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
                                    variable_manager=variable_manager, loader=loader,
            results =
            if isinstance(results, list):
                for p in results:
                    display.display('\nplaybook: %s' % p['playbook'])
                    for idx, play in enumerate(p['plays']):
                        if play._included_path is not None:
                            pb_dir = os.path.realpath(os.path.dirname(p['playbook']))
                        msg = "\n  play #%d (%s): %s" % (idx + 1, ','.join(play.hosts),
                        mytags = set(play.tags)
                        msg += '\tTAGS: [%s]' % (','.join(mytags))
                        if context.CLIARGS['listhosts']:
                            playhosts = set(inventory.get_hosts(play.hosts))
                            msg += "\n    pattern: %s\n    hosts (%d):" % (play.hosts, len(playhosts))
                            for host in playhosts:
                                msg += "\n      %s" % host
                        all_tags = set()
                        if context.CLIARGS['listtags'] or context.CLIARGS['listtasks']:
                            taskmsg = ''
                            if context.CLIARGS['listtasks']:
                                taskmsg = '    tasks:\n'
                            def _process_block(b):
                                taskmsg = ''
                                for task in b.block:
                                    if isinstance(task, Block):
                                        taskmsg += _process_block(task)
                                        if task.action == 'meta':
                                        if context.CLIARGS['listtasks']:
                                            cur_tags = list(mytags.union(set(task.tags)))
                                                taskmsg += "      %s" % task.get_name()
                                                taskmsg += "      %s" % task.action
                                            taskmsg += "\tTAGS: [%s]\n" % ', '.join(cur_tags)
                                return taskmsg
                            all_vars = variable_manager.get_vars(play=play)
                            for block in play.compile():
                                block = block.filter_tagged_tasks(all_vars)
                                if not block.has_tasks():
                                taskmsg += _process_block(block)
                            if context.CLIARGS['listtags']:
                                cur_tags = list(mytags.union(all_tags))
                                taskmsg += "      TASK TAGS: [%s]\n" % ', '.join(cur_tags)
                return 0
                return results

    The function

    is still very long, but the point is:

    # create the playbook executor, which manages running the plays via a task queue manager
    pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
                                    variable_manager=variable_manager, loader=loader,
            results =

    here the execution process is encapsulated with PlaybookExecutor, and the task queue manager.

    follow up PlaybookExecutor. Run (), we can see the key code is:

    class PlaybookExecutor:
        This is the primary class for executing playbooks, and thus the
        basis for bin/ansible-playbook operation.
        def __init__(self, playbooks, inventory, variable_manager, loader, passwords):
            self._playbooks = playbooks
            self._inventory = inventory
            self._variable_manager = variable_manager
            self._loader = loader
            self.passwords = passwords
            self._unreachable_hosts = dict()
            if context.CLIARGS.get('listhosts') or context.CLIARGS.get('listtasks') or \
                    context.CLIARGS.get('listtags') or context.CLIARGS.get('syntax'):
                self._tqm = None
                self._tqm = TaskQueueManager(
        def run(self):
            Run the given playbook, based on the settings in the play which
            may limit the runs to serialized groups, etc.
                        if self._tqm is None:
                            # we are just doing a listing
                            previously_failed = len(self._tqm._failed_hosts)
                            previously_unreachable = len(self._tqm._unreachable_hosts)
                            break_play = False
                            # we are actually running plays
                            batches = self._get_serialized_batches(play)
                            if len(batches) == 0:
                                self._tqm.send_callback('v2_playbook_on_play_start', play)
                            for batch in batches:
                                # restrict the inventory to the hosts in the serialized batch
                                # and run it...
                                result =

    you can see a couple of things:

    • if the parameter contains listhosts, listtasks, listtags, syntax then it does not actually run, but returns the information for playbook
    • 0

    • 1 if it needs to run, then through 2 TaskQueueManager3, (4))
    • we need to look further into the (play=play)

    for batchforks (such concurrency is not thread-safe)


    a few things can be seen from the following code:

    The default strategy is linear strategy

  • . Once the task has been completed on all hosts, it will proceed to the next task
    def run(self, play):
        Iterates over the roles/tasks in a play, using the given (or default)
        strategy for queueing tasks. The default is the linear strategy, which
        operates like classic Ansible by keeping all hosts in lock-step with
        a given task (meaning no hosts move on to the next task until all hosts
        are done with the current task).

        if not self._callbacks_loaded:

        all_vars = self._variable_manager.get_vars(play=play)
        templar = Templar(loader=self._loader, variables=all_vars)

        new_play = play.copy()
        new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers

        self.hostvars = HostVars(

        play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno())
        if (self._stdout_callback and
                hasattr(self._stdout_callback, 'set_play_context')):

        for callback_plugin in self._callback_plugins:
            if hasattr(callback_plugin, 'set_play_context'):

        self.send_callback('v2_playbook_on_play_start', new_play)

        # build the iterator
        iterator = PlayIterator(

        # adjust to # of workers to configured forks or size of batch, whatever is lower
        self._initialize_processes(min(self._forks, iterator.batch_size))

        # load the specified strategy (or the default linear one)
        strategy = strategy_loader.get(new_play.strategy, self)
        if strategy is None:
            raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)

        # Because the TQM may survive multiple play runs, we start by marking
        # any hosts as failed in the iterator here which may have been marked
        # as failed in previous runs. Then we clear the internal list of failed
        # hosts so we know what failed this round.
        for host_name in self._failed_hosts.keys():
            host = self._inventory.get_host(host_name)


        # during initialization, the PlayContext will clear the start_at_task
        # field to signal that a matching task was found, so check that here
        # and remember it so we don't try to skip tasks on future plays
        if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None:
            self._start_at_done = True

        # and run the play using the strategy and cleanup on way out
        play_return =, play_context)

        # now re-save the hosts that failed from the iterator to our internal list
        for host_name in iterator.get_failed_hosts():
            self._failed_hosts[host_name] = True

        return play_return

therefore, we still need to go to to find out

through this part of the code

    def run(self, iterator, play_context):
        The linear strategy is simple - get the next task and queue
        it for all hosts, then wait for the queue to drain before
        moving on to the next task

        # iterate over each task, while there is one left to run
        result = self._tqm.RUN_OK
        work_to_do = True
        while work_to_do and not self._tqm._terminated:

                display.debug("getting the remaining hosts for this loop")
                hosts_left = self.get_hosts_left(iterator)
                display.debug("done getting the remaining hosts for this loop")

                # queue up this task for each host in the inventory
                callback_sent = False
                work_to_do = False

                host_results = []
                host_tasks = self._get_next_task_lockstep(hosts_left, iterator)


                    results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))


        return super(StrategyModule, self).run(iterator, play_context, result)

(this function is too long, I omitted most of it)

the final line here is results += self._process_pending_results(iterator, max_process_pending_results = Max (1, int(len(self._tqm._workers) * 0.1)) , that is, we process through the _process_pending_results function, and we use a lot of callbacks:

                if task_result.is_failed() or task_result.is_unreachable():
                    self._tqm.send_callback('v2_runner_item_on_failed', task_result)
                elif task_result.is_skipped():
                    self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
                    if 'diff' in task_result._result:
                        if self._diff or getattr(original_task, 'diff', False):
                            self._tqm.send_callback('v2_on_file_diff', task_result)
                    self._tqm.send_callback('v2_runner_item_on_ok', task_result)


callbacks are from the taskQueueManager


from the following code, we can see that the callback can come from _stdout_callback

    def send_callback(self, method_name, *args, **kwargs):
        for callback_plugin in [self._stdout_callback] + self._callback_plugins:
            # a plugin that set self.disabled to True will not be called
            # see example for such a plugin
            if getattr(callback_plugin, 'disabled', False):

            # try to find v2 method, fallback to v1 method, ignore callback if no method found
            methods = []
            for possible in [method_name, 'v2_on_any']:
                gotit = getattr(callback_plugin, possible, None)
                if gotit is None:
                    gotit = getattr(callback_plugin, possible.replace('v2_', ''), None)
                if gotit is not None:

Therefore, as long as we can override the stdout_callback of the TaskQueueManager, we can get the intermediate result

final code

omits the intermediate analysis step and goes directly to the code

from ansible.cli.playbook import PlaybookCLI
from ansible.plugins.callback import CallbackBase
import json
from ansible.cli import CLI
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible import context
from ansible import constants as C

class ResultCallback(CallbackBase):
    """A sample callback plugin used for performing an action as results come in

    If you want to collect all results into a single object for processing at
    the end of the execution, look into utilizing the ``json`` callback plugin
    or writing your own custom callback plugin
    def v2_runner_on_ok(self, result, **kwargs):
        """Print a json representation of the result

        This method could store the result in an instance attribute for retrieval later
        host = result._host
        print(json.dumps({ result._result}, indent=4))

    def v2_runner_on_failed(self, result, **kwargs):
      host = result._host.get_name()
      self.runner_on_failed(host, result._result, False)

    def v2_runner_on_unreachable(self, result):
      host = result._host.get_name()
      self.runner_on_unreachable(host, result._result)

    def v2_runner_on_skipped(self, result):
         host = result._host.get_name()
         self.runner_on_skipped(host, self._get_item(getattr(result._result,'results',{})))
         print("this task does not execute,please check parameter or condition.")

    def v2_playbook_on_stats(self, stats):
      print('===========play executes completed========')

cli = PlaybookCLI([" ",'-i', 'hosts.uat', 'kibana_deploy_plugin.yml'])


loader, inventory, variable_manager = cli._play_prereqs()

CLI.get_host_list(inventory, context.CLIARGS['subset'])

pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
                                variable_manager=variable_manager, loader=loader,

pbex._tqm._stdout_callback = ResultCallback()

run it, so nice…

===v2_runner_on_unreachable====host=BI-LASS-Kibana_10.60.x.x===result={'unreachable': True, 'msg': 'Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out', 'changed': False}
===v2_runner_on_unreachable====host=BI-LASS-Kibana_10.60.x.x===result={'unreachable': True, 'msg': 'Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out', 'changed': False}
===========play executes completed========

Read More: