Developing Pyinfra Connectors
So, Pyinfra's connectors have some things going on for them, mainly, their base.
First things first:
Let's start in parts, firstly, the ConnectorData
This is a Python object with one singular objective: store all data that can be defined inside your Connector.
Ssh connector example:
class ConnectorData(TypedDict):
ssh_hostname: str
ssh_port: int
ssh_user: str
ssh_password: str
ssh_key: str
ssh_key_password: str
ssh_allow_agent: bool
ssh_look_for_keys: bool
ssh_forward_agent: bool
ssh_config_file: str
ssh_known_hosts_file: str
ssh_strict_host_key_checking: str
ssh_paramiko_connect_kwargs: dict
ssh_connect_retries: int
ssh_connect_retry_min_delay: float
ssh_connect_retry_max_delay: float
ssh_file_transfer_protocol: str
This gets loaded by the BaseConnector through the line:
This translating to Ch-aOS's fleets:
fleet:
hosts:
- "@ssh/my_server.net": # You can omit the "@ssh/"
ssh_hostname: str
ssh_port: str
...: ...
As you can see, Its a perfect match!
Meta shtuff
The DataMeta class is a simple auto documenting class/default values handler... quite Ch-aOtic of them to add a feature like this huh?
class DataMeta:
description: str
default: Any
def __init__(self, description, default=None) -> None:
self.description = description
self.default = default
Pyinfra's ssh object example of usage:
connector_data_meta: dict[str, DataMeta] = {
"ssh_hostname": DataMeta("SSH hostname"),
"ssh_port": DataMeta("SSH port"),
"ssh_user": DataMeta("SSH user"),
"ssh_password": DataMeta("SSH password"),
"ssh_key": DataMeta("SSH key filename"),
"ssh_key_password": DataMeta("SSH key password"),
"ssh_allow_agent": DataMeta(
"Whether to use any active SSH agent",
True,
),
# Other ConnectorData descriptions and default values
The juicy part
Now... the BaseConnector part... this is the most complex part of them, but it is quite important.
This is the complete object:
class BaseConnector(abc.ABC):
state: "State"
host: "Host"
handles_execution = False
data_cls: Type = ConnectorData
data_meta: dict[str, DataMeta] = {}
def __init__(self, state: "State", host: "Host"):
self.state = state
self.host = host
self.data = host_to_connector_data(self.data_cls, self.data_meta, host.data)
@staticmethod
@abc.abstractmethod
def make_names_data(name: str) -> Iterator[tuple[str, dict, list[str]]]:
"""
Generate inventory targets. This is a staticmethod because each yield will become a new host
object with a new (ie not this) instance of the connector.
"""
def connect(self) -> None:
"""
Connect this connector instance. Should raise ConnectError exceptions to indicate failure.
"""
def disconnect(self) -> None:
"""
Disconnect this connector instance.
"""
@abc.abstractmethod
def run_shell_command(
self,
command: "StringCommand",
print_output: bool,
print_input: bool,
**arguments: Unpack["ConnectorArguments"],
) -> tuple[bool, "CommandOutput"]:
"""
Execute a command.
Args:
command (StringCommand): actual command to execute
print_output (bool): whether to print command output
print_input (bool): whether to print command input
arguments: (ConnectorArguments): connector global arguments
Returns:
tuple: (bool, CommandOutput)
Bool indicating success and CommandOutput with stdout/stderr lines.
"""
@abc.abstractmethod
def put_file(
self,
filename_or_io: Union[str, IOBase],
remote_filename: str,
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack["ConnectorArguments"],
) -> bool:
"""
Upload a local file or IO object by copying it to a temporary directory
and then writing it to the upload location.
Returns:
bool: indicating success or failure.
"""
@abc.abstractmethod
def get_file(
self,
remote_filename: str,
filename_or_io: Union[str, IOBase],
remote_temp_filename: Optional[str] = None,
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack["ConnectorArguments"],
) -> bool:
"""
Download a local file by copying it to a temporary location and then writing
it to our filename or IO object.
Returns:
bool: indicating success or failure.
"""
def check_can_rsync(self) -> None:
raise NotImplementedError("This connector does not support rsync")
def rsync(
self,
src: str,
dest: str,
flags: Iterable[str],
print_output: bool = False,
print_input: bool = False,
**arguments: Unpack["ConnectorArguments"],
) -> bool:
raise NotImplementedError("This connector does not support rsync")
As you can see... quite the big amount of code. Good for you (not me), I've spent my time reading the source code for Pyinfra extensively!
You don't need to touch state or host.
handles_execution Is a simple bool that answers the question "Can your connector handle command execution?". Quite important for different systems.
data_cls again, just the data for your connector.
data_meta a dict of DataMetas to document and set default values
First method: make_names_data is like Provider's get_cli_name, it simply yields the tag for your connector.
Pyinfra's ssh example:
@override
@staticmethod
def make_names_data(name):
yield "@ssh/{0}".format(name), {"ssh_hostname": name}, [] # transforms the name part into an @ssh/{name} and turns "ssh_hostname" into {name}
Some other methods that are well documented (connect, disconnect)
run_shell_command has one exact quirk to it: **arguments. This receives a ConnectorArguments object, which is the basic global arguments (_sudo, _doas, etc), which can be handled by you however you please, pyinfra's ssh object uses lines like _stdin = arguments.pop("_stdin", None). It should return a CommandOutput object, which is, again, a simple object that can return each output, stdout and stderr from your connector, both as a list and as a singular, unified string.
These behaviours should be the same accross all other methods until check_can_rsync and rsync which... well, they do exactly what the name suggests.
I quite recommend you to check the Connectors inside of pyinfra's source code, they can teach you quite a lot. Also, I do recommend looking at their documentation for more details and examples!
Quirks, Tips and Shticks worth pointing out
Firstly: On a connection failed, make sure to raise a ConnectError exception, Pyinfra will catch it and handle it properly.
Secondly: make_names_data is a staticmethod, meaning you can't access self or any instance variables inside of it. This is because each yield will create a new instance of your connector.
To "get" data inside of make_names_data, you need to do outside functions, like documented in their official docs:
def load_settings():
settings = {}
# logic here
return settings
class InventoryConnector(BaseConnector):
api_instance = external.ApiClient()
...
@staticmethod
def make_names_data(_=None)
api_client = getattr(InventoryConnector, 'api_instance')
api_settings = load_settings()
...
3: About run_shell_command, You should use their official function make_unix_command_for_host() and extract_control_commands() from pyinfra.connectors.util to properly handle control commands like _sudo, _doas, etc. Official example:
control_commands = extract_control_commands(arguments) # returns a dict of control commands
wrapped_command = make_unix_command_for_host(
self.state,
self.host,
command,
**arguments, # Be sure to pass ARGUMENTS, not control_commands
)
timeout = control_commands.get("_timeout")
success_exit_codes = control_commands.get("_success_exit_codes", [0])
exit_code, output = self._execute(wrapped_command, timeout=timeout)
success = exit_code in success_exit_codes
return success, output
This is because Pyinfra has a lot of built-in logic to handle these control commands, and you should leverage that instead of re-inventing the wheel.
Also it parses commands correctly which ensures that these run correctly on the target system.
How are them different from Ch-aOS' Boats?
Well, quite a lot actually. Boats provide you with dynamic inventories, Pyinfra Connectors provide you with connection methods, which are quite different things. Both can be used together in harmony, Boats can provide dynamic inventories that use different Connectors for different hosts, providing you with a lot of flexibility and power!