Artículo
· 23 sep, 2024 Lectura de 8 min

DataPipe2 - Implementar un pipe de datos

¡Hola, desarrolladores!

En esta serie de artículos hemos hablado del framework iris-datapipe, de cómo nos ayuda a crear "pipes" de datos para la ingesta y procesamiento, y de cómo instalarlo. Vamos a profundizar en cómo implementar uno de esos "pipes" paso a paso.

Si llegaste directamente a este artículo, te recomiendo revisar los anteriores y recordar que iris-datapipe incluye un QuickStart para que puedas explorar sus funcionalidades rápidamente 👌.

El ejemplo que abordaremos está incluido en el QuickStart, por lo que puedes utilizarlo como referencia.

Definir un nuevo Pipe

Comienza por definir un nuevo Pipe en la interfaz gráfica. Básicamente, solo se trata de asignar un código y una descripción.

Opcionalmente, puedes especificar un recurso de seguridad de IRIS que se requerirá para poder operar con ese Pipe (esto es útil si necesitas crear pipes que solo sean accesibles a determinados usuarios).

En el ejemplo, definimos un pipe llamado REST-API, que se encargará de procesar datos recibidos desde una API REST, donde llegarán datos sobre personas.

image

Procesamiento de los datos

Para procesar los datos utilizando un "pipe", necesitamos seguir los siguientes pasos:

1) Definir un modelo DataPipe

Debemos definir un modelo para los datos que queremos procesar.

Un modelo no es más que una clase que hereda o extiende de DataPipe.Model.cls donde tendrás que implementar algunos métodos.

Tu modelo debe implementar:

  • Cómo serializar/deserializar tus datos (por ejemplo, usando XML o JSON).
  • Cómo normalizar y validar tus datos.
  • Y finalmente, qué operación quieres ejecutar sobre tus datos una vez están normalizados y validados.

En mi ejemplo, el modelo que utilizaré será DataPipe.Test.REST.Models.Person.cls.

Dado que el modelo es una clase convencional en InterSystems IRIS, puedes añadir herencia u otro comportamiento que necesites. En mi caso, heredo de DataPipe.Test.REST.Msg.PersonData, donde tengo definidas las propiedades que me interesa tratar.

El modelo DataPipe.Test.REST.Models.Person.cls tiene diferentes métodos:

  • Serialize, Deserialize: Se utilizan para indicar cómo serializar/deserializar el modelo. En este caso, utilizo JSON.
  • Normalize: Especifica cómo quiero normalizar el modelo. En mi caso, solo quiero llamar a una transformación de datos.
/// Normalize model
Method Normalize(Output obj As DataPipe.Model) As %Status
{
    set ret = $$$OK
    try {
        // call normalizaton data transform
        set sc = $classmethod("DataPipe.Test.REST.DT.PersonNormalize", "Transform", $this, .obj)
        $$$ThrowOnError(sc)

    } catch ex {
        set ret = ex.AsStatus()
    }
    quit ret
}
  • Validate: Indica cómo quiero validar si mi modelo es correcto o no. Puedo añadir "warnings" también. Puedes implementar lo que necesites:
/// Validate model
Method Validate(Output errorList As %List) As %Status
{
    #define AddError(%list, %code, %desc) set error = ##class(DataPipe.Data.ErrorInfo).%New() set error.Code=%code set error.Desc=%desc do %list.Insert(error)

    set ret = $$$OK
    try {
        set errorList = ##class(%ListOfObjects).%New()

        // date of birth
        if ..DOB="" { 
            $$$AddError(errorList, "V001", "DOB required")
        } else {
            set yearDOB = $extract($zdate(..DOB,8),1,4)
            if (yearDOB < 1930) $$$AddError(errorList, "V002", "DOB must be greater than 1930")
            if (yearDOB < 1983) $$$AddError(errorList, "W083", "Warning! Older than 1983")
        }

        // model is invalid if errors (not warnings) found
        for i=1:1:errorList.Count() {
            set error = errorList.GetAt(i)
            set errorCode = error.Code

            // in this sample model, all warnings start with "W"
            if errorCode'["W" {
                $$$ThrowStatus($$$ERROR($$$GeneralError, "Invalid"))
            }
        }

    } catch ex {
        set ret = ex.AsStatus()
    }

    quit ret
}
  • GetOperation: Indica qué Business Operation quiero que ejecute la operación sobre los datos (lo entenderás mejor cuando comentemos los componentes de interoperabilidad).
/// Return the Business Operation name that will run the operation with the model
/// Each Business Operation can be used to hold different queues
Method GetOperation() As %Status
{
    quit "Person Operation"
}
  • RunOperation: Esta es la operación que quiero ejecutar sobre mis datos. Puedo guardarlos en la base de datos o llamar a otro componente de interoperabilidad para continuar el procesamiento más adelante. En mi ejemplo, los guardo en una global y también los envío a otro Business Process.
/// Run final operation with the model
/// This method can be used to persit data from the model to an operational data store
Method RunOperation(Output errorList As %List, Output log As %Stream.Object, bOperation As Ens.BusinessOperation = "", Output delayedProcessing As %Boolean = 0) As %Status
{
    #define AddError(%list, %code, %desc) set error = ##class(DataPipe.Data.ErrorInfo).%New() set error.Code=%code set error.Desc=%desc do %list.Insert(error)
    #define AddLog(%log, %msg) do %log.WriteLine("["_$zdt($h,3)_"] "_%msg)

    set errorList = ##class(%ListOfObjects).%New()
    set log = ##class(%Stream.GlobalCharacter).%New()

    set ret = $$$OK
    try {
        TSTART
        $$$AddLog(log, "Transaction Started")

        // simulate an operation error
        if ##class(Ens.Util.FunctionSet).In(..Name, ##class(DataPipe.Test.HL7.Helper).OperationErrorNames()) {
            $$$ThrowStatus($$$ERROR($$$GeneralError, "Simulated Operation Error"))
        }

        // store serialized model
        $$$ThrowOnError(..Serialize(.stream))
        set ^zDataPipe($i(^zDataPipe)) = stream.Read()
        $$$AddLog(log, "Model Stored in ^zDataPipe("_$get(^zDataPipe)_")")

        TCOMMIT
        $$$AddLog(log, "Transaction Commited")

        // you can send messages to other production components (while you are not on an open transaction)
        // you can use this feature to continue processing the record in other component (delayed processing)
        set delayedProcessing = 1
        if $isobject(bOperation) {
            set req = bOperation.OperRequest
            $$$ThrowOnError(bOperation.SendRequestAsync("REST Delayed Oper Update", req))
        }

    } catch ex {
        TROLLBACK 
        $$$AddLog(log, "Rollback!")

        set ret = ex.AsStatus()
        $$$AddLog(log, "Error catched: "_$system.Status.GetOneStatusText(ret))

        // include exception errors into errorList
        do $system.Status.DecomposeStatus(ret, .errors)
        for i=1:1:errors {
            $$$AddError(errorList, "Exception", errors(i))
        }
    }
    quit ret
}

2) Añadir componentes de interoperabilidad

Después de definir tu modelo, necesitas configurar una producción de interoperabilidad usando componentes de DataPipe. iris-datapipe incluye componentes preconstruidos que pueden operar con un modelo DataPipe como el que hemos definido anteriormente.

El único proceso que debes implementar es el proceso de ingestión.

2.1) Crear un Proceso de Ingestión

Necesitas crear un nuevo Business Process que utilice como contexto DataPipe.Ingestion.BP.IngestionManagerContext.

Este proceso recibirá la entrada de datos que decidas (en este ejemplo, el mensaje que envía una API REST que actúa como Business Service) y debe implementar:

Identificación de los datos que procesas (InboxAttributes):

  • Debes identificar el registro que estás procesando y proporcionar los InboxAttributes.
  • El "pipe" al que pertenece este registro debe indicarse en este momento.
  • Estos atributos describirán el registro que estás tratando y luego se utilizarán para la búsqueda desde la interfaz gráfica.
  • Para realizar lo anterior, puedes utilizar transformaciones de datos, código, ¡lo que necesites!

Convertir los datos de entrada en el modelo de datos que definiste previamente:

  • Debes utilizar transformaciones, código o lo que prefieras para transformar la información de entrada al modelo de datos que has desarrollado previamente.

image

2.2) Añadir el resto de componentes

El resto de los componentes de interoperabilidad son proporcionados por iris-datapipe y ya están preconstruidos. Estos componentes llamarán a los distintos métodos implementados en tu modelo.

En general, necesitarás añadir a la producción, por cada "pipe" diferente que quieras implementar:

Aquí tienes la producción de ejemplo que se utiliza en el QuickStart.

Con todo esto, cuando comiencen a llegar datos y se procesen, podrás verlos directamente desde la interfaz gráfica.

image

¡Espero que os sea útil!

Comentarios (0)1
Inicie sesión o regístrese para continuar