- ansible playbook call li>
-
- find command path li>
- to find the source code and analyze li>
- simplify call li> ul>
li>
- call after the interaction of the li>
-
- ansible run analysis li>
-
- cli. The run () li>
- PlaybookExecutor. The run () li>
- TaskQueueManager. The run () li>
- strategy. The run () li>
- TaskQueueManager. Send_callback () li>
- final code
about ansible, I will not talk about science, in a word, it is necessary to do the operation and maintenance of the distributed system, to achieve the functions of batch system configuration, batch program deployment, batch run commands and so on. Ansible is a big killer, it can make you get twice the result with half the effort.
, however, as a cli tool, its usage scenario is still limited by cli, unable to achieve deeper interaction and logical control in the running process. Ansible itself is done in Python, so it is actually seamlessly linked to python’s script control and can be used directly in Python. But there’s very little of this in the documentation on the website itself, and very little on the Internet, so this post is an attempt to provide some guidance.
first of all, I’m not a python Daniel, can’t directly from the code of the line to see the entire ansible architecture and code, and official website said ansible code has been refactoring (found in the Internet from a handful of cases have proved that, the python call ansible example, in the new version of the ansbile can hardly run a), as a result, what we need is a way of thinking: from the CLI to start with strong> p>
ansible playbook call
Let’s take ansible
as an example, and show you how we can start from CLI and find a way to call ansible
in python, step by step
find the command path
$ which ansible-playbook
/Library/Frameworks/Python.framework/Versions/3.6/bin/ansible-playbook
find the source code and analyze
use the above command to find ansible-playbook. Open the file (don’t read the contents of the file) :
#!/Library/Frameworks/Python.framework/Versions/3.6/bin/python3.6
# (c) 2012, Michael DeHaan <[email protected]>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
########################################################
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
__requires__ = ['ansible']
import os
import shutil
import sys
import traceback
from ansible import context
from ansible.errors import AnsibleError, AnsibleOptionsError, AnsibleParserError
from ansible.module_utils._text import to_text
# Used for determining if the system is running a new enough python version
# and should only restrict on our documented minimum versions
_PY3_MIN = sys.version_info[:2] >= (3, 5)
_PY2_MIN = (2, 6) <= sys.version_info[:2] < (3,)
_PY_MIN = _PY3_MIN or _PY2_MIN
if not _PY_MIN:
raise SystemExit('ERROR: Ansible requires a minimum of Python2 version 2.6 or Python3 version 3.5. Current version: %s' % ''.join(sys.version.splitlines()))
class LastResort(object):
# OUTPUT OF LAST RESORT
def display(self, msg, log_only=None):
print(msg, file=sys.stderr)
def error(self, msg, wrap_text=None):
print(msg, file=sys.stderr)
if __name__ == '__main__':
display = LastResort()
try: # bad ANSIBLE_CONFIG or config options can force ugly stacktrace
import ansible.constants as C
from ansible.utils.display import Display
except AnsibleOptionsError as e:
display.error(to_text(e), wrap_text=False)
sys.exit(5)
cli = None
me = os.path.basename(sys.argv[0])
try:
display = Display()
display.debug("starting run")
sub = None
target = me.split('-')
if target[-1][0].isdigit():
# Remove any version or python version info as downstreams
# sometimes add that
target = target[:-1]
if len(target) > 1:
sub = target[1]
myclass = "%sCLI" % sub.capitalize()
elif target[0] == 'ansible':
sub = 'adhoc'
myclass = 'AdHocCLI'
else:
raise AnsibleError("Unknown Ansible alias: %s" % me)
try:
mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass)
except ImportError as e:
# ImportError members have changed in py3
if 'msg' in dir(e):
msg = e.msg
else:
msg = e.message
if msg.endswith(' %s' % sub):
raise AnsibleError("Ansible sub-program not implemented: %s" % me)
else:
raise
try:
args = [to_text(a, errors='surrogate_or_strict') for a in sys.argv]
except UnicodeError:
display.error('Command line args are not in utf-8, unable to continue. Ansible currently only understands utf-8')
display.display(u"The full traceback was:\n\n%s" % to_text(traceback.format_exc()))
exit_code = 6
else:
cli = mycli(args)
exit_code = cli.run()
except AnsibleOptionsError as e:
cli.parser.print_help()
display.error(to_text(e), wrap_text=False)
exit_code = 5
except AnsibleParserError as e:
display.error(to_text(e), wrap_text=False)
exit_code = 4
# TQM takes care of these, but leaving comment to reserve the exit codes
# except AnsibleHostUnreachable as e:
# display.error(str(e))
# exit_code = 3
# except AnsibleHostFailed as e:
# display.error(str(e))
# exit_code = 2
except AnsibleError as e:
display.error(to_text(e), wrap_text=False)
exit_code = 1
except KeyboardInterrupt:
display.error("User interrupted execution")
exit_code = 99
except Exception as e:
if C.DEFAULT_DEBUG:
# Show raw stacktraces in debug mode, It also allow pdb to
# enter post mortem mode.
raise
have_cli_options = bool(context.CLIARGS)
display.error("Unexpected Exception, this is probably a bug: %s" % to_text(e), wrap_text=False)
if not have_cli_options or have_cli_options and context.CLIARGS['verbosity'] > 2:
log_only = False
if hasattr(e, 'orig_exc'):
display.vvv('\nexception type: %s' % to_text(type(e.orig_exc)))
why = to_text(e.orig_exc)
if to_text(e) != why:
display.vvv('\noriginal msg: %s' % why)
else:
display.display("to see the full traceback, use -vvv")
log_only = True
display.display(u"the full traceback was:\n\n%s" % to_text(traceback.format_exc()), log_only=log_only)
exit_code = 250
finally:
# Remove ansible tmpdir
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
sys.exit(exit_code)
as you can see, most of the content is exception handling, skip it and find the most important sentence:
...
if len(target) > 1:
sub = target[1]
myclass = "%sCLI" % sub.capitalize()
elif target[0] == 'ansible':
sub = 'adhoc'
myclass = 'AdHocCLI'
else:
raise AnsibleError("Unknown Ansible alias: %s" % me)
try:
mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass)
...
cli = mycli(args)
exit_code = cli.run()
through these lines of code, we can locate the package ansible. Cli
directory. A variety of CLI is supported below :
├── adhoc.py
├── arguments
│ ├── __init__.py
│ ├── __pycache__
│ │ ├── __init__.cpython-36.pyc
│ │ └── optparse_helpers.cpython-36.pyc
│ └── optparse_helpers.py
├── config.py
├── console.py
├── doc.py
├── galaxy.py
├── inventory.py
├── playbook.py
├── pull.py
└── vault.py
and the way to call it is also very simple, just need to figure out what args is, you can simply add print(args)
.
simplifies calling
finally the above code can be simplified to the following example:
from ansible.cli.playbook import PlaybookCLI
mycli = PlaybookCLI
cli = mycli([" ",'-i', 'hosts.uat', 'kibana_deploy_plugin.yml'])
exit_code = cli.run()
note that the parameter is ['-i', 'hosts. Uat ', 'kibana_deploy_plugin.yml']
. The format is the same as we run ansible playbook
, except that is required to provide as an array.
run it, and here’s the result:
PLAY [Deploy kibana optimize] **************************************************
TASK [Gathering Facts] *********************************************************
fatal: [BI-LASS-Kibana_10.60.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out", "unreachable": true}
fatal: [BI-LASS-Kibana_10.50.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.50.x.x port 22: Operation timed out", "unreachable": true}
PLAY RECAP *********************************************************************
BI-LASS-Kibana_10.60.x.x : ok=0 changed=0 unreachable=1 failed=0 skipped=0 rescued=0 ignored=0
BI-LASS-Kibana_10.50.x.x : ok=0 changed=0 unreachable=1 failed=0 skipped=0 rescued=0 ignored=0
Process finished with exit code 0
, we can see that the script is working, so at this point, we can call ansible playbook in python.
interaction after the call
is not enough, we need interaction, we need to get the results of task running, and do additional analysis and logical processing based on the results, so we need to study the code in more depth.
ansible operating analysis
cli.run()
first take a look at cli.run()
function:
def run(self):
super(PlaybookCLI, self).run()
# Note: slightly wrong, this is written so that implicit localhost
# manages passwords
sshpass = None
becomepass = None
passwords = {}
# initial error check, to make sure all specified playbooks are accessible
# before we start running anything through the playbook executor
b_playbook_dirs = []
for playbook in context.CLIARGS['args']:
if not os.path.exists(playbook):
raise AnsibleError("the playbook: %s could not be found" % playbook)
if not (os.path.isfile(playbook) or stat.S_ISFIFO(os.stat(playbook).st_mode)):
raise AnsibleError("the playbook: %s does not appear to be a file" % playbook)
b_playbook_dir = os.path.dirname(os.path.abspath(to_bytes(playbook, errors='surrogate_or_strict')))
# load plugins from all playbooks in case they add callbacks/inventory/etc
add_all_plugin_dirs(b_playbook_dir)
b_playbook_dirs.append(b_playbook_dir)
set_collection_playbook_paths(b_playbook_dirs)
# don't deal with privilege escalation or passwords when we don't need to
if not (context.CLIARGS['listhosts'] or context.CLIARGS['listtasks'] or
context.CLIARGS['listtags'] or context.CLIARGS['syntax']):
(sshpass, becomepass) = self.ask_passwords()
passwords = {'conn_pass': sshpass, 'become_pass': becomepass}
# create base objects
loader, inventory, variable_manager = self._play_prereqs()
# (which is not returned in list_hosts()) is taken into account for
# warning if inventory is empty. But it can't be taken into account for
# checking if limit doesn't match any hosts. Instead we don't worry about
# limit if only implicit localhost was in inventory to start with.
#
# Fix this when we rewrite inventory by making localhost a real host (and thus show up in list_hosts())
CLI.get_host_list(inventory, context.CLIARGS['subset'])
# flush fact cache if requested
if context.CLIARGS['flush_cache']:
self._flush_cache(inventory, variable_manager)
# create the playbook executor, which manages running the plays via a task queue manager
pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
variable_manager=variable_manager, loader=loader,
passwords=passwords)
results = pbex.run()
if isinstance(results, list):
for p in results:
display.display('\nplaybook: %s' % p['playbook'])
for idx, play in enumerate(p['plays']):
if play._included_path is not None:
loader.set_basedir(play._included_path)
else:
pb_dir = os.path.realpath(os.path.dirname(p['playbook']))
loader.set_basedir(pb_dir)
msg = "\n play #%d (%s): %s" % (idx + 1, ','.join(play.hosts), play.name)
mytags = set(play.tags)
msg += '\tTAGS: [%s]' % (','.join(mytags))
if context.CLIARGS['listhosts']:
playhosts = set(inventory.get_hosts(play.hosts))
msg += "\n pattern: %s\n hosts (%d):" % (play.hosts, len(playhosts))
for host in playhosts:
msg += "\n %s" % host
display.display(msg)
all_tags = set()
if context.CLIARGS['listtags'] or context.CLIARGS['listtasks']:
taskmsg = ''
if context.CLIARGS['listtasks']:
taskmsg = ' tasks:\n'
def _process_block(b):
taskmsg = ''
for task in b.block:
if isinstance(task, Block):
taskmsg += _process_block(task)
else:
if task.action == 'meta':
continue
all_tags.update(task.tags)
if context.CLIARGS['listtasks']:
cur_tags = list(mytags.union(set(task.tags)))
cur_tags.sort()
if task.name:
taskmsg += " %s" % task.get_name()
else:
taskmsg += " %s" % task.action
taskmsg += "\tTAGS: [%s]\n" % ', '.join(cur_tags)
return taskmsg
all_vars = variable_manager.get_vars(play=play)
for block in play.compile():
block = block.filter_tagged_tasks(all_vars)
if not block.has_tasks():
continue
taskmsg += _process_block(block)
if context.CLIARGS['listtags']:
cur_tags = list(mytags.union(all_tags))
cur_tags.sort()
taskmsg += " TASK TAGS: [%s]\n" % ', '.join(cur_tags)
display.display(taskmsg)
return 0
else:
return results
The function
is still very long, but the point is:
# create the playbook executor, which manages running the plays via a task queue manager
pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
variable_manager=variable_manager, loader=loader,
passwords=passwords)
results = pbex.run()
here the execution process is encapsulated with PlaybookExecutor
, and the task queue manager
.
PlaybookExecutor.run()
follow up PlaybookExecutor. Run ()
, we can see the key code is:
class PlaybookExecutor:
'''
This is the primary class for executing playbooks, and thus the
basis for bin/ansible-playbook operation.
'''
def __init__(self, playbooks, inventory, variable_manager, loader, passwords):
self._playbooks = playbooks
self._inventory = inventory
self._variable_manager = variable_manager
self._loader = loader
self.passwords = passwords
self._unreachable_hosts = dict()
if context.CLIARGS.get('listhosts') or context.CLIARGS.get('listtasks') or \
context.CLIARGS.get('listtags') or context.CLIARGS.get('syntax'):
self._tqm = None
else:
self._tqm = TaskQueueManager(
inventory=inventory,
variable_manager=variable_manager,
loader=loader,
passwords=self.passwords,
forks=context.CLIARGS.get('forks'),
)
...
def run(self):
'''
Run the given playbook, based on the settings in the play which
may limit the runs to serialized groups, etc.
'''
...
if self._tqm is None:
# we are just doing a listing
entry['plays'].append(play)
else:
self._tqm._unreachable_hosts.update(self._unreachable_hosts)
previously_failed = len(self._tqm._failed_hosts)
previously_unreachable = len(self._tqm._unreachable_hosts)
break_play = False
# we are actually running plays
batches = self._get_serialized_batches(play)
if len(batches) == 0:
self._tqm.send_callback('v2_playbook_on_play_start', play)
self._tqm.send_callback('v2_playbook_on_no_hosts_matched')
for batch in batches:
# restrict the inventory to the hosts in the serialized batch
self._inventory.restrict_to_hosts(batch)
# and run it...
result = self._tqm.run(play=play)
...
you can see a couple of things:
- if the parameter contains
listhosts
, listtasks
, listtags
, syntax
then it does not actually run, but returns the information for playbook
0
- 1 if it needs to run, then through
2 TaskQueueManager
3, (4)) li>
- we need to look further into the
_tq.run (play=play)
for batchforks (such concurrency is not thread-safe) li b>