ploosh.
Documentation
Add a new connector
This guide explains how to create a new connector for Ploosh. A connector is a Python class that fetches data from a source and returns it as a DataFrame.
Architecture overview
Ploosh discovers connectors automatically at startup. The init.py file in src/ploosh/connectors/ scans all files matching the pattern connector_*.py, imports them, and registers every class whose name starts with Connector.
There is no registry to update and no configuration to change: simply creating a correctly named file is enough.
src/ploosh/connectors/
├── connector.py # Base class
├── connector_csv.py # Example: native connector
├── connectorcsvspark.py # Example: Spark connector
├── connector<yourname>.py # ← Your new connector
└── init.py # Auto-discovery logic
Step 1 — Create the connector file
Create a new file in src/ploosh/connectors/ with the naming convention:
| Type | File name |
|---|---|
| Native (Pandas) | connector |
| Spark | connector |
Step 2 — Implement the class
Every connector extends the Connector base class and must:
- Set
name— the identifier used in YAML files (type:) - Set
connection_definition— parameters declared inconnections.yml - Set
configuration_definition— parameters declared in test case YAML - Implement
get_data()— returns apandas.DataFrame(native) or a SparkDataFrame(Spark) - Set
executed_action— the query, file path or command that was executed (used in logs)
Base class reference
class Connector:
name = None
connection_definition = None
configuration_definition = None
is_spark = False
spark = None
executed_action = None def get_data(self, configuration: dict, connection: dict):
return None
def getexecutedaction(self):
return self.executed_action
Minimal native connector
"""Connector to read data from FooBar"""import pandas as pd
from connectors.connector import Connector
class ConnectorFooBar(Connector):
"""Connector to read data from FooBar"""
def init(self):
self.name = "FOOBAR"
self.connection_definition = []
self.configuration_definition = [
{"name": "path"},
]
def get_data(self, configuration: dict, connection: dict):
self.executed_action = configuration["path"]
df = pd.read_csv(configuration["path"])
return df
Minimal Spark connector
"""Connector to read data from FooBar with Spark"""from connectors.connector import Connector
class ConnectorFooBarSpark(Connector):
"""Connector to read data from FooBar with Spark"""
def init(self):
self.name = "FOOBAR_SPARK"
self.is_spark = True
self.connection_definition = []
self.configuration_definition = [
{"name": "path"},
]
def get_data(self, configuration: dict, connection: dict):
self.executed_action = configuration["path"]
df = self.spark.read.format("foobar").load(configuration["path"])
return df
For Spark connectors, the Spark session is automatically injected into self.spark by the framework at startup.Step 3 — Define parameters
Parameters are validated and resolved at runtime by pyjeb via the controlandsetup function. This library handles default values, type casting, required field validation and validset enforcement. You only need to declare the parameter definitions — Ploosh and pyjeb take care of the rest.
Each parameter is a dictionary with the following keys:
| Key | Required | Description |
|---|---|---|
name | Yes | Parameter name, used as key in YAML |
default | No | Default value. If absent, the parameter is required |
type | No | Type cast: string, integer, decimal, boolean, list, dict |
validset | No | List of allowed values |
connection_definition
These parameters are declared in connections.yml and shared across all test cases using this connection.
self.connection_definition = [
{
"name": "mode",
"default": "password",
"validset": ["password", "connection_string"],
},
{"name": "hostname", "default": None},
{"name": "database", "default": None},
{"name": "username", "default": None},
{"name": "password", "default": None},
{"name": "port", "default": 3306, "type": "integer"},
{"name": "connection_string", "default": None},
]
Parameters with"default": Noneare optional. Parameters withoutdefaultare required.
configuration_definition
These parameters are declared in the test case YAML, under source or expected.
self.configuration_definition = [
{"name": "query"}, # Required
{"name": "connection"}, # Required
{"name": "timeout", "type": "integer", "default": 30}, # Optional
]
Step 4 — Implement get_data()
The get_data method receives two dictionaries:
| Parameter | Description |
|---|---|
configuration | Resolved test case parameters (from configurationdefinition) |
connection | Resolved connection parameters (from connectiondefinition), or None if no connection is needed |
- Set
self.executed_actionwith a meaningful description (query, file path, etc.) - Return a DataFrame —
pandas.DataFramefor native connectors, SparkDataFramefor Spark connectors
Parameter values are already validated and defaults are applied by the framework using pyjeb.controlandsetup before get_data() is called.
Example with connection
def get_data(self, configuration: dict, connection: dict):
hostname = connection["hostname"]
database = connection["database"]
query = configuration["query"] self.executed_action = query
engine = create_engine(f"foobar://{hostname}/{database}")
df = pd.read_sql(query, engine)
return df
Step 5 — Add dependencies
If your connector requires an external Python package, you need to declare it in pyproject.toml.
1. Core dependency
If your dependency is a core library needed by the framework itself (e.g. pandas, pyjeb), add it with a pinned version to the dependencies list under [project]:
# pyproject.toml
[project]
dependencies = [
# ... existing dependencies
"my-package==1.2.3",
]
Core dependencies are always installed with pip install ploosh.
2. Connector-specific dependency (extra)
If your dependency is specific to your connector (e.g. a database driver), add it as an optional dependency (extra) under [project.optional-dependencies]. Create a dedicated extra named after your connector, and also add it to the full extra:
# pyproject.toml
[project.optional-dependencies]
my-connector = ["my-package==1.2.3"]full = [
# ... existing connector dependencies
"my-package==1.2.3",
]
Users can then install only what they need:
pip install ploosh[my-connector] # core + your connector
pip install ploosh[full] # core + all connectors
Import the optional dependency inside the connector's get_data method (not at module level), so that the connector module can still be loaded when the extra is not installed.Step 6 — Add unit tests
Create a test file in tests/connectors/ named test_.
The tests should:
- Instantiate the connector
- Prepare
configurationandconnectiondictionaries - Call
controlandsetupto apply defaults (same behavior as the framework) - Call
get_dataand validate the result
import pandas as pd
import pytest
from pyjeb import controlandsetup
from ploosh.connectors.connector_foobar import ConnectorFooBar@pytest.fixture
def connector():
return ConnectorFooBar()
def testgetdata(connector):
configuration = {"path": "./tests/.data/sample.csv"}
connection = {}
configuration = controlandsetup(configuration, connector.configuration_definition)
connection = controlandsetup(connection, connector.connection_definition)
df = connector.get_data(configuration, connection)
assert not df.empty
assert connector.executed_action == "./tests/.data/sample.csv"
Run the tests with:
pytest tests/connectors/test_foobar.py -v
Step 7 — Add documentation
Create a documentation page in docs/connectors/native/ or docs/connectors/spark/ following the existing format. Each connector page should include:
- A short description
- The list of connection parameters (if any) with types and defaults
- The list of configuration parameters with types and defaults
- A complete YAML example showing
connections.ymland a test case
YAML usage after creation
Once the file is created, users can immediately use the connector in their test cases:
# connections.yml (only if connection_definition is not empty)
my_foobar:
type: foobar
hostname: localhost
database: mydb
# test_case.yml
Test FooBar data:
source:
type: foobar
connection: my_foobar
query: "SELECT * FROM my_table"
expected:
type: csv
path: ./expected/my_table.csv
Thetypevalue in YAML is case-insensitive and maps to thenameattribute of the connector class.
Checklist
- [ ] File created as
connector_in.py src/ploosh/connectors/ - [ ] Class extends
Connectorand name starts withConnector - [ ]
nameis set (uppercase, unique) - [ ]
connection_definitionis set (empty list[]if no connection needed) - [ ]
configuration_definitionis set - [ ]
is_spark = Trueif it's a Spark connector - [ ]
get_data()returns a DataFrame - [ ]
executedactionis set ingetdata() - [ ] Unit tests added in
tests/connectors/ - [ ] Documentation added in
docs/connectors/