Productsup

Data service connector

Transform, validate, or enrich data within the Productsup pipeline.

A data service connector transforms data within the Productsup pipeline. Unlike data source and export connectors that move data in or out of the platform, a data service operates on data that's already there — validating, enriching, or reshaping it.

Data flow

Productsup platform → Container API (input) → Your connector → Container API (output) → Productsup platform

                                              Event logs + feedback

Your connector reads products from the input, applies transformations, and writes the results back to the output. It can also write event logs for monitoring and audit trails.

How it works at runtime

  1. The platform starts your Docker container and runs your CLI command
  2. Your code reads products from the Container API input
  3. You transform, validate, or enrich each product
  4. You write the transformed products to the Container API output
  5. Optionally, you write event logs to report warnings or errors
  6. Exit with code 0 (success) or non-zero (failure)

Reading and writing data

Data service connectors read from input and write to output — the same methods used by export and data source connectors.

Read → Transform → Write

foreach ($this->containerApi->yieldBatchFromInputFile(500) as $batch) {
    $transformed = \array_map(function (array $product): array {
        // Your transformation logic
        $product['transformed_at'] = \date('c');
        return $product;
    }, $batch);

    $this->containerApi->appendManyToOutputFile($transformed);
}

Event logs

Data service connectors can write structured event logs to report issues found during processing. Event logs use predefined event codes and can include user-facing information.

use Productsup\CDE\ContainerApi\EventLog\EventLogFile;
use Productsup\CDE\ContainerApi\EventLog\Event;
use Productsup\CDE\ContainerApi\EventLog\EventCodes;

$eventLogFile = new EventLogFile(\sys_get_temp_dir() . '/event_log.json');

$eventLogFile->addEvent(
    (new Event(EventCodes::WARNING_MISSING_OPTIONAL_FIELD))
        ->setUserInfo(['Description' => 'Field "color" is missing from 12 products.'])
);

$eventLogFile->save();
$this->containerApi->saveEventLogRawPayload(\json_encode($eventLogFile));

Example service

A minimal data service that adds a timestamp to every product:

<?php

namespace App\DataService\Application\Service;

use Productsup\CDE\ContainerApi\ContainerApiInterface;

readonly class DataServiceService
{
    private const BATCH_SIZE = 500;

    public function __construct(
        private ContainerApiInterface $containerApi,
    ) {}

    public function run(): void
    {
        $timestamp = \date('c');

        foreach ($this->containerApi->yieldBatchFromInputFile(self::BATCH_SIZE) as $batch) {
            $transformed = \array_map(
                static fn(array $product): array => [...$product, 'processed_at' => $timestamp],
                $batch,
            );

            $this->containerApi->appendManyToOutputFile($transformed);
        }

        $this->containerApi->info('Data service complete.');
    }
}

Configuration

Data service connectors have several type-specific configurations:

  • Column prefix — a unique prefix for columns generated by this service (convention: starts with ___)
  • Service typeInternal (runs within the platform) or External (file-based)
  • Stages — which processing stages this service runs in (Import or Intermediate)
  • Max usage — optional limit on how many sites can use this service

See Type-specific config for details.

Next steps

How is this guide?

On this page