article directory
-
- ansible playbook call li>
-
- find command path li>
- to find the source code and analyze li>
- simplify call li> ul>
li> - call after the interaction of the li>
-
- ansible run analysis li>
-
- cli. The run () li>
- PlaybookExecutor. The run () li>
- TaskQueueManager. The run () li>
- strategy. The run () li>
- TaskQueueManager. Send_callback () li>
- final code
about ansible, I will not talk about science, in a word, it is necessary to do the operation and maintenance of the distributed system, to achieve the functions of batch system configuration, batch program deployment, batch run commands and so on. Ansible is a big killer, it can make you get twice the result with half the effort.
, however, as a cli tool, its usage scenario is still limited by cli, unable to achieve deeper interaction and logical control in the running process. Ansible itself is done in Python, so it is actually seamlessly linked to python’s script control and can be used directly in Python. But there’s very little of this in the documentation on the website itself, and very little on the Internet, so this post is an attempt to provide some guidance.
first of all, I’m not a python Daniel, can’t directly from the code of the line to see the entire ansible architecture and code, and official website said ansible code has been refactoring (found in the Internet from a handful of cases have proved that, the python call ansible example, in the new version of the ansbile can hardly run a), as a result, what we need is a way of thinking: from the CLI to start with strong> p>
ansible playbook call
Let’s take ansibleas an example, and show you how we can start from CLI and find a way to call ansible
in python, step by step
find the command path
$ which ansible-playbook /Library/Frameworks/Python.framework/Versions/3.6/bin/ansible-playbook
find the source code and analyze
use the above command to find ansible-playbook. Open the file (don’t read the contents of the file) :
#!/Library/Frameworks/Python.framework/Versions/3.6/bin/python3.6 # (c) 2012, Michael DeHaan <[email protected]> # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. ######################################################## from __future__ import (absolute_import, division, print_function) __metaclass__ = type __requires__ = ['ansible'] import os import shutil import sys import traceback from ansible import context from ansible.errors import AnsibleError, AnsibleOptionsError, AnsibleParserError from ansible.module_utils._text import to_text # Used for determining if the system is running a new enough python version # and should only restrict on our documented minimum versions _PY3_MIN = sys.version_info[:2] >= (3, 5) _PY2_MIN = (2, 6) <= sys.version_info[:2] < (3,) _PY_MIN = _PY3_MIN or _PY2_MIN if not _PY_MIN: raise SystemExit('ERROR: Ansible requires a minimum of Python2 version 2.6 or Python3 version 3.5. Current version: %s' % ''.join(sys.version.splitlines())) class LastResort(object): # OUTPUT OF LAST RESORT def display(self, msg, log_only=None): print(msg, file=sys.stderr) def error(self, msg, wrap_text=None): print(msg, file=sys.stderr) if __name__ == '__main__': display = LastResort() try: # bad ANSIBLE_CONFIG or config options can force ugly stacktrace import ansible.constants as C from ansible.utils.display import Display except AnsibleOptionsError as e: display.error(to_text(e), wrap_text=False) sys.exit(5) cli = None me = os.path.basename(sys.argv[0]) try: display = Display() display.debug("starting run") sub = None target = me.split('-') if target[-1][0].isdigit(): # Remove any version or python version info as downstreams # sometimes add that target = target[:-1] if len(target) > 1: sub = target[1] myclass = "%sCLI" % sub.capitalize() elif target[0] == 'ansible': sub = 'adhoc' myclass = 'AdHocCLI' else: raise AnsibleError("Unknown Ansible alias: %s" % me) try: mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass) except ImportError as e: # ImportError members have changed in py3 if 'msg' in dir(e): msg = e.msg else: msg = e.message if msg.endswith(' %s' % sub): raise AnsibleError("Ansible sub-program not implemented: %s" % me) else: raise try: args = [to_text(a, errors='surrogate_or_strict') for a in sys.argv] except UnicodeError: display.error('Command line args are not in utf-8, unable to continue. Ansible currently only understands utf-8') display.display(u"The full traceback was:\n\n%s" % to_text(traceback.format_exc())) exit_code = 6 else: cli = mycli(args) exit_code = cli.run() except AnsibleOptionsError as e: cli.parser.print_help() display.error(to_text(e), wrap_text=False) exit_code = 5 except AnsibleParserError as e: display.error(to_text(e), wrap_text=False) exit_code = 4 # TQM takes care of these, but leaving comment to reserve the exit codes # except AnsibleHostUnreachable as e: # display.error(str(e)) # exit_code = 3 # except AnsibleHostFailed as e: # display.error(str(e)) # exit_code = 2 except AnsibleError as e: display.error(to_text(e), wrap_text=False) exit_code = 1 except KeyboardInterrupt: display.error("User interrupted execution") exit_code = 99 except Exception as e: if C.DEFAULT_DEBUG: # Show raw stacktraces in debug mode, It also allow pdb to # enter post mortem mode. raise have_cli_options = bool(context.CLIARGS) display.error("Unexpected Exception, this is probably a bug: %s" % to_text(e), wrap_text=False) if not have_cli_options or have_cli_options and context.CLIARGS['verbosity'] > 2: log_only = False if hasattr(e, 'orig_exc'): display.vvv('\nexception type: %s' % to_text(type(e.orig_exc))) why = to_text(e.orig_exc) if to_text(e) != why: display.vvv('\noriginal msg: %s' % why) else: display.display("to see the full traceback, use -vvv") log_only = True display.display(u"the full traceback was:\n\n%s" % to_text(traceback.format_exc()), log_only=log_only) exit_code = 250 finally: # Remove ansible tmpdir shutil.rmtree(C.DEFAULT_LOCAL_TMP, True) sys.exit(exit_code)
as you can see, most of the content is exception handling, skip it and find the most important sentence:
... if len(target) > 1: sub = target[1] myclass = "%sCLI" % sub.capitalize() elif target[0] == 'ansible': sub = 'adhoc' myclass = 'AdHocCLI' else: raise AnsibleError("Unknown Ansible alias: %s" % me) try: mycli = getattr(__import__("ansible.cli.%s" % sub, fromlist=[myclass]), myclass) ... cli = mycli(args) exit_code = cli.run()
through these lines of code, we can locate the package
ansible. Cli
directory. A variety of CLI is supported below :├── adhoc.py ├── arguments │ ├── __init__.py │ ├── __pycache__ │ │ ├── __init__.cpython-36.pyc │ │ └── optparse_helpers.cpython-36.pyc │ └── optparse_helpers.py ├── config.py ├── console.py ├── doc.py ├── galaxy.py ├── inventory.py ├── playbook.py ├── pull.py └── vault.py
and the way to call it is also very simple, just need to figure out what args is, you can simply add
print(args)
.simplifies calling
finally the above code can be simplified to the following example:
from ansible.cli.playbook import PlaybookCLI mycli = PlaybookCLI cli = mycli([" ",'-i', 'hosts.uat', 'kibana_deploy_plugin.yml']) exit_code = cli.run()
note that the parameter is
['-i', 'hosts. Uat ', 'kibana_deploy_plugin.yml']
. The format is the same as we runansible playbook
, except that is required to provide as an array.run it, and here’s the result:
PLAY [Deploy kibana optimize] ************************************************** TASK [Gathering Facts] ********************************************************* fatal: [BI-LASS-Kibana_10.60.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out", "unreachable": true} fatal: [BI-LASS-Kibana_10.50.x.x]: UNREACHABLE! => {"changed": false, "msg": "Failed to connect to the host via ssh: ssh: connect to host 10.50.x.x port 22: Operation timed out", "unreachable": true} PLAY RECAP ********************************************************************* BI-LASS-Kibana_10.60.x.x : ok=0 changed=0 unreachable=1 failed=0 skipped=0 rescued=0 ignored=0 BI-LASS-Kibana_10.50.x.x : ok=0 changed=0 unreachable=1 failed=0 skipped=0 rescued=0 ignored=0 Process finished with exit code 0
, we can see that the script is working, so at this point, we can call ansible playbook in python.
interaction after the call
is not enough, we need interaction, we need to get the results of task running, and do additional analysis and logical processing based on the results, so we need to study the code in more depth.
ansible operating analysis
cli.run()
first take a look at
cli.run()
function:def run(self): super(PlaybookCLI, self).run() # Note: slightly wrong, this is written so that implicit localhost # manages passwords sshpass = None becomepass = None passwords = {} # initial error check, to make sure all specified playbooks are accessible # before we start running anything through the playbook executor b_playbook_dirs = [] for playbook in context.CLIARGS['args']: if not os.path.exists(playbook): raise AnsibleError("the playbook: %s could not be found" % playbook) if not (os.path.isfile(playbook) or stat.S_ISFIFO(os.stat(playbook).st_mode)): raise AnsibleError("the playbook: %s does not appear to be a file" % playbook) b_playbook_dir = os.path.dirname(os.path.abspath(to_bytes(playbook, errors='surrogate_or_strict'))) # load plugins from all playbooks in case they add callbacks/inventory/etc add_all_plugin_dirs(b_playbook_dir) b_playbook_dirs.append(b_playbook_dir) set_collection_playbook_paths(b_playbook_dirs) # don't deal with privilege escalation or passwords when we don't need to if not (context.CLIARGS['listhosts'] or context.CLIARGS['listtasks'] or context.CLIARGS['listtags'] or context.CLIARGS['syntax']): (sshpass, becomepass) = self.ask_passwords() passwords = {'conn_pass': sshpass, 'become_pass': becomepass} # create base objects loader, inventory, variable_manager = self._play_prereqs() # (which is not returned in list_hosts()) is taken into account for # warning if inventory is empty. But it can't be taken into account for # checking if limit doesn't match any hosts. Instead we don't worry about # limit if only implicit localhost was in inventory to start with. # # Fix this when we rewrite inventory by making localhost a real host (and thus show up in list_hosts()) CLI.get_host_list(inventory, context.CLIARGS['subset']) # flush fact cache if requested if context.CLIARGS['flush_cache']: self._flush_cache(inventory, variable_manager) # create the playbook executor, which manages running the plays via a task queue manager pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=passwords) results = pbex.run() if isinstance(results, list): for p in results: display.display('\nplaybook: %s' % p['playbook']) for idx, play in enumerate(p['plays']): if play._included_path is not None: loader.set_basedir(play._included_path) else: pb_dir = os.path.realpath(os.path.dirname(p['playbook'])) loader.set_basedir(pb_dir) msg = "\n play #%d (%s): %s" % (idx + 1, ','.join(play.hosts), play.name) mytags = set(play.tags) msg += '\tTAGS: [%s]' % (','.join(mytags)) if context.CLIARGS['listhosts']: playhosts = set(inventory.get_hosts(play.hosts)) msg += "\n pattern: %s\n hosts (%d):" % (play.hosts, len(playhosts)) for host in playhosts: msg += "\n %s" % host display.display(msg) all_tags = set() if context.CLIARGS['listtags'] or context.CLIARGS['listtasks']: taskmsg = '' if context.CLIARGS['listtasks']: taskmsg = ' tasks:\n' def _process_block(b): taskmsg = '' for task in b.block: if isinstance(task, Block): taskmsg += _process_block(task) else: if task.action == 'meta': continue all_tags.update(task.tags) if context.CLIARGS['listtasks']: cur_tags = list(mytags.union(set(task.tags))) cur_tags.sort() if task.name: taskmsg += " %s" % task.get_name() else: taskmsg += " %s" % task.action taskmsg += "\tTAGS: [%s]\n" % ', '.join(cur_tags) return taskmsg all_vars = variable_manager.get_vars(play=play) for block in play.compile(): block = block.filter_tagged_tasks(all_vars) if not block.has_tasks(): continue taskmsg += _process_block(block) if context.CLIARGS['listtags']: cur_tags = list(mytags.union(all_tags)) cur_tags.sort() taskmsg += " TASK TAGS: [%s]\n" % ', '.join(cur_tags) display.display(taskmsg) return 0 else: return results
The function
is still very long, but the point is:
# create the playbook executor, which manages running the plays via a task queue manager pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=passwords) results = pbex.run()
here the execution process is encapsulated with
PlaybookExecutor
, and thetask queue manager
.PlaybookExecutor.run()
follow up
PlaybookExecutor. Run ()
, we can see the key code is:class PlaybookExecutor: ''' This is the primary class for executing playbooks, and thus the basis for bin/ansible-playbook operation. ''' def __init__(self, playbooks, inventory, variable_manager, loader, passwords): self._playbooks = playbooks self._inventory = inventory self._variable_manager = variable_manager self._loader = loader self.passwords = passwords self._unreachable_hosts = dict() if context.CLIARGS.get('listhosts') or context.CLIARGS.get('listtasks') or \ context.CLIARGS.get('listtags') or context.CLIARGS.get('syntax'): self._tqm = None else: self._tqm = TaskQueueManager( inventory=inventory, variable_manager=variable_manager, loader=loader, passwords=self.passwords, forks=context.CLIARGS.get('forks'), ) ... def run(self): ''' Run the given playbook, based on the settings in the play which may limit the runs to serialized groups, etc. ''' ... if self._tqm is None: # we are just doing a listing entry['plays'].append(play) else: self._tqm._unreachable_hosts.update(self._unreachable_hosts) previously_failed = len(self._tqm._failed_hosts) previously_unreachable = len(self._tqm._unreachable_hosts) break_play = False # we are actually running plays batches = self._get_serialized_batches(play) if len(batches) == 0: self._tqm.send_callback('v2_playbook_on_play_start', play) self._tqm.send_callback('v2_playbook_on_no_hosts_matched') for batch in batches: # restrict the inventory to the hosts in the serialized batch self._inventory.restrict_to_hosts(batch) # and run it... result = self._tqm.run(play=play) ...
you can see a couple of things:
- if the parameter contains
listhosts
,listtasks
,listtags
,syntax
then it does not actually run, but returns the information for playbook - 1 if it needs to run, then through
2 TaskQueueManager
3, (4)) li> - we need to look further into the
_tq.run (play=play)
0
for batchforks (such concurrency is not thread-safe) li b>
-
TaskQueueManager.run()
a few things can be seen from the following code:
The default strategy is linear strategy
- . Once the task has been completed on all hosts, it will proceed to the next task
def run(self, play):
'''
Iterates over the roles/tasks in a play, using the given (or default)
strategy for queueing tasks. The default is the linear strategy, which
operates like classic Ansible by keeping all hosts in lock-step with
a given task (meaning no hosts move on to the next task until all hosts
are done with the current task).
'''
if not self._callbacks_loaded:
self.load_callbacks()
all_vars = self._variable_manager.get_vars(play=play)
warn_if_reserved(all_vars)
templar = Templar(loader=self._loader, variables=all_vars)
new_play = play.copy()
new_play.post_validate(templar)
new_play.handlers = new_play.compile_roles_handlers() + new_play.handlers
self.hostvars = HostVars(
inventory=self._inventory,
variable_manager=self._variable_manager,
loader=self._loader,
)
play_context = PlayContext(new_play, self.passwords, self._connection_lockfile.fileno())
if (self._stdout_callback and
hasattr(self._stdout_callback, 'set_play_context')):
self._stdout_callback.set_play_context(play_context)
for callback_plugin in self._callback_plugins:
if hasattr(callback_plugin, 'set_play_context'):
callback_plugin.set_play_context(play_context)
self.send_callback('v2_playbook_on_play_start', new_play)
# build the iterator
iterator = PlayIterator(
inventory=self._inventory,
play=new_play,
play_context=play_context,
variable_manager=self._variable_manager,
all_vars=all_vars,
start_at_done=self._start_at_done,
)
# adjust to # of workers to configured forks or size of batch, whatever is lower
self._initialize_processes(min(self._forks, iterator.batch_size))
# load the specified strategy (or the default linear one)
strategy = strategy_loader.get(new_play.strategy, self)
if strategy is None:
raise AnsibleError("Invalid play strategy specified: %s" % new_play.strategy, obj=play._ds)
# Because the TQM may survive multiple play runs, we start by marking
# any hosts as failed in the iterator here which may have been marked
# as failed in previous runs. Then we clear the internal list of failed
# hosts so we know what failed this round.
for host_name in self._failed_hosts.keys():
host = self._inventory.get_host(host_name)
iterator.mark_host_failed(host)
self.clear_failed_hosts()
# during initialization, the PlayContext will clear the start_at_task
# field to signal that a matching task was found, so check that here
# and remember it so we don't try to skip tasks on future plays
if context.CLIARGS.get('start_at_task') is not None and play_context.start_at_task is None:
self._start_at_done = True
# and run the play using the strategy and cleanup on way out
play_return = strategy.run(iterator, play_context)
# now re-save the hosts that failed from the iterator to our internal list
for host_name in iterator.get_failed_hosts():
self._failed_hosts[host_name] = True
strategy.cleanup()
self._cleanup_processes()
return play_return
therefore, we still need to go to strategy.run()
to find out
strategy.run()
through this part of the code
def run(self, iterator, play_context):
'''
The linear strategy is simple - get the next task and queue
it for all hosts, then wait for the queue to drain before
moving on to the next task
'''
# iterate over each task, while there is one left to run
result = self._tqm.RUN_OK
work_to_do = True
while work_to_do and not self._tqm._terminated:
try:
display.debug("getting the remaining hosts for this loop")
hosts_left = self.get_hosts_left(iterator)
display.debug("done getting the remaining hosts for this loop")
# queue up this task for each host in the inventory
callback_sent = False
work_to_do = False
host_results = []
host_tasks = self._get_next_task_lockstep(hosts_left, iterator)
...
results += self._process_pending_results(iterator, max_passes=max(1, int(len(self._tqm._workers) * 0.1)))
...
return super(StrategyModule, self).run(iterator, play_context, result)
(this function is too long, I omitted most of it)
the final line here is results += self._process_pending_results(iterator, max_process_pending_results = Max (1, int(len(self._tqm._workers) * 0.1))
, that is, we process through the _process_pending_results
function, and we use a lot of callbacks:
if task_result.is_failed() or task_result.is_unreachable():
self._tqm.send_callback('v2_runner_item_on_failed', task_result)
elif task_result.is_skipped():
self._tqm.send_callback('v2_runner_item_on_skipped', task_result)
else:
if 'diff' in task_result._result:
if self._diff or getattr(original_task, 'diff', False):
self._tqm.send_callback('v2_on_file_diff', task_result)
self._tqm.send_callback('v2_runner_item_on_ok', task_result)
The
callbacks are from the taskQueueManager
TaskQueueManager.send_callback()
from the following code, we can see that the callback can come from _stdout_callback
def send_callback(self, method_name, *args, **kwargs):
for callback_plugin in [self._stdout_callback] + self._callback_plugins:
# a plugin that set self.disabled to True will not be called
# see osx_say.py example for such a plugin
if getattr(callback_plugin, 'disabled', False):
continue
# try to find v2 method, fallback to v1 method, ignore callback if no method found
methods = []
for possible in [method_name, 'v2_on_any']:
gotit = getattr(callback_plugin, possible, None)
if gotit is None:
gotit = getattr(callback_plugin, possible.replace('v2_', ''), None)
if gotit is not None:
methods.append(gotit)
Therefore, as long as we can override the stdout_callback of the TaskQueueManager, we can get the intermediate result
final code
omits the intermediate analysis step and goes directly to the code
from ansible.cli.playbook import PlaybookCLI
from ansible.plugins.callback import CallbackBase
import json
from ansible.cli import CLI
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible import context
from ansible import constants as C
class ResultCallback(CallbackBase):
"""A sample callback plugin used for performing an action as results come in
If you want to collect all results into a single object for processing at
the end of the execution, look into utilizing the ``json`` callback plugin
or writing your own custom callback plugin
"""
def v2_runner_on_ok(self, result, **kwargs):
"""Print a json representation of the result
This method could store the result in an instance attribute for retrieval later
"""
host = result._host
print(json.dumps({host.name: result._result}, indent=4))
def v2_runner_on_failed(self, result, **kwargs):
host = result._host.get_name()
self.runner_on_failed(host, result._result, False)
print('===v2_runner_on_failed====host=%s===result=%s'%(host,result._result))
def v2_runner_on_unreachable(self, result):
host = result._host.get_name()
self.runner_on_unreachable(host, result._result)
print('===v2_runner_on_unreachable====host=%s===result=%s'%(host,result._result))
def v2_runner_on_skipped(self, result):
if C.DISPLAY_SKIPPED_HOSTS:
host = result._host.get_name()
self.runner_on_skipped(host, self._get_item(getattr(result._result,'results',{})))
print("this task does not execute,please check parameter or condition.")
def v2_playbook_on_stats(self, stats):
print('===========play executes completed========')
cli = PlaybookCLI([" ",'-i', 'hosts.uat', 'kibana_deploy_plugin.yml'])
super(PlaybookCLI,cli).run()
loader, inventory, variable_manager = cli._play_prereqs()
CLI.get_host_list(inventory, context.CLIARGS['subset'])
pbex = PlaybookExecutor(playbooks=context.CLIARGS['args'], inventory=inventory,
variable_manager=variable_manager, loader=loader,
passwords=None)
pbex._tqm._stdout_callback = ResultCallback()
pbex.run()
run it, so nice…
===v2_runner_on_unreachable====host=BI-LASS-Kibana_10.60.x.x===result={'unreachable': True, 'msg': 'Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out', 'changed': False}
===v2_runner_on_unreachable====host=BI-LASS-Kibana_10.60.x.x===result={'unreachable': True, 'msg': 'Failed to connect to the host via ssh: ssh: connect to host 10.60.x.x port 22: Operation timed out', 'changed': False}
===========play executes completed========
div>
Read More:
- The Python DOM method iterates over all the XML in a folder
- Python recursively traverses all files in the directory to find the specified file
- Pychar can’t connect to Python console, but it can run. Py file, and Anaconda’s command line can run Python command
- Python traverses all files under the specified path and retrieves them according to the time interval
- Python automatically generates the requirements file for the current project
- Python: How to Reshape the data in Pandas DataFrame
- Change the Python installation path in Pycharm
- Python classes that connect to the database
- Python+ Pandas + Evaluation of Music Equipment over the years (Notes)
- [Solved] ansible Command Error: Error -5 while decompressing data: incomplete or truncated stream
- Autograd error in Python: runtimeerror: grad can be implicitly created only for scalar outputs
- Python3.6 Run uvicorn Error: AttributeError: module ‘asyncio‘ has no attribute ‘run‘
- Python Error: SyntaxError: (unicode error) ‘unicodeescape‘ codec can‘t decode bytes in position 2-3:
- [Solved] AttributeError: ‘module‘ object has no attribute ‘CALIB_HAND_EYE_PARK‘
- can‘t multiply sequence by non-int of type ‘numpy.float64‘
- [Solved] PyCharm pytest-html Install Error: Try to run this command from the system terminal. Make sure that you use
- Python errors: valueerror: if using all scalar values, you must pass an index (four solutions)
- [Solved] python opencv Error: findContours() Can only use the Grayscale
- Python server run code ModuleNotFoundError Error [How to Solve]
- [Solved] NPM install Error: check python checking for Python executable python2 in the PATH