A reusable, customizable and workflow based task service which creates some actionable tasks based upon various events happening via different microservices in a distributed system.
this.bind(TaskServingBindings.Config).to({useCustomSequence:true,// enable this if you want to use your own sequence instead of one provided by the task service// though note that using a custom sequence may break or completely disable the authentication and authorization implemenation of task service});
to use the task service you need to bind atleast two datasources (though both could connect to the same db)
the dataSourceName property of these two should be WorkflowServiceSourceName and TaskDbSourceName variables exported by the task service. For example, one of the datasources would look something like this -
import{inject,lifeCycleObserver,LifeCycleObserver}from'@loopback/core';import{juggler}from'@loopback/repository';import{TaskDbSourceName}from'@sourceloop/task-service';import{config}from'./config';// Observe application's life cycle to disconnect the datasource when// application is stopped. This allows the application to be shut down// gracefully. The `stop()` method is inherited from `juggler.DataSource`.// Learn more at https://loopback.io/doc/en/lb4/Life-cycle.html@lifeCycleObserver('datasource')exportclassPgDbDataSourceextendsjuggler.DataSourceimplementsLifeCycleObserver{staticdataSourceName=TaskDbSourceName;// this is the line that should variable from task servicestaticreadonlydefaultConfig=config;constructor(@inject(`datasources.config.${TaskDbSourceName}`,{optional:true})dsConfig:object=config,){super(dsConfig);}}
To run the migrations provided by the task service (available in the migrations folder of both the source code and the packed version in your node_modules), use the db-migrate package.
Run the HTTP migrations only if you plan to use the Http Outgoing connector.
Additionally, there is now an option to choose between SQL migration or PostgreSQL migration.
NOTE : For @sourceloop/cli users, this choice can be specified during the scaffolding process by selecting the "type of datasource" option.
This command expects an input variable holding a list of tasks. This command creates all the tasks in this variable and triggers an outgoing event with the created tasks. To trigger this command at a node, use the topic - create-tasks and provide a variable with structure -
This command is expected to be used as a topic for the end event of a task workflow. For example, in case of camunda, it would be topic for Message End Event node. To use this command, use the topic - end-task
Task service needs an IIncomingConnector and an IOutgoingConnectors implementation to send and receive events from and to an external source. By default, task service comes with 2 different sets of connectors. Note that you can use different types of incoming and outgoing connectors (For e.g. Incoming events are received through Kafka but outgoing events go to a webhook subcriptions using Http)
This set of connectors implements connectors to receive and send events through Kafka. It can be bound as both incoming and outgoing connector and needs an extra binding of an adapter for adapting kafka events to the type expected by the task service. You need to install kafkajs to use these connectors - npm i kafkajs
// Bind the config for Kafka connectorsthis.bind(TaskServiceKafkaModule.CONFIG).to({// this part is required for both incoming and outgoing connectorsconnection:{clientId:process.env.KAFKA_CLIENT_ID,brokers:[...(process.env.KAFKA_SERVER?.split(',')??['localhost:9092'])],...(process.env.KAFKA_SSL==='true'?{ssl:true,}:{}),},// this part is required only if you use it as an incoming connectorconsumer:{groupId:process.env.KAFKA_GROUP_ID??'task-service',},topics:['sow.update'],// topics for receiving events// this part is required only if you use it as an outgoing connectorproducer:{},output:{topic:'test',// topic for output events},});// bind the connector as an incoming connector if requiredthis.bind(TaskServiceBindings.INCOMING_CONNECTOR).toClass(KafkaStreamService);// bind the connector as an outgoing connector if requiredthis.bind(TaskServiceBindings.OUTGOING_CONNECTOR).toClass(KafkaStreamService);// bind the default adapter (it is available in `@sourceloop/task-service/kafka`)this.bind(TaskServiceKafkaModule.ADAPTER).toClass(KafkaEventAdapter);
// this line binds both the incoming and outgoing connectors plus some controllers required by the boththis.component(TaskHttpComponent);// you can override either of the connectors by adding a new binding for them after the above
{"url":"http://localhost:3000",// the url that will hit with the payload for every outgoing event"key":"event-key"// the event keys for which this url would be hit}
and a couple of required request headers - x-api-key and x-api-secret.
values for these headers are generated through another endpoint - /client-apps. This endpoint is supposed to be hit once by each new client and returns newly generated key and secret that are used for sending and verifying webhook calls. The call to the /client-apps
expects following body -
each webhook call also sends two headers - x-task-signature and x-task-timestamp to help validate the authenticity of the webhook calls by the client. This signature can be validated by the client by generating an HMAC using the event payload and the timestamp. A sample node.js code on how to do this is given below -
functionvalidateSignature(request){constsignature=request.headers['x-task-signature'];consttimestamp=request.headers['x-task-timestampt'];constpayload=request.body;// the secret in this line is the one generated by the /client-apps endpointconsthmac=createHmac('sha256',yourApiSecret);hmac.update(`${JSON.stringify(event)}:${timestamp}`);constexpectedSignature=hmac.digest('hex');if(// compare both the strings!crypto.timingSafeEqual(Buffer.from(expectedSignature),Buffer.from(signature),)){// throw an error if signature does not matchthrownewHttpErrors.Unauthorized(INVALID_WEBHOOK_SIGNATURE);}returntrue;}
This service supports Sequelize as the underlying ORM using @loopback/sequelize extension. And in order to use it, you'll need to do following changes.
1.To use Sequelize in your application, add following to application.ts:
If you are using asymmetric token signing and verification, you should have auth datasource present in the service and auth redis datasource on the facade level. Example datasource file for auth database is:-
import{inject,lifeCycleObserver,LifeCycleObserver}from'@loopback/core';import{juggler}from'@loopback/repository';import{AuthDbSourceName}from'@sourceloop/core';constDEFAULT_MAX_CONNECTIONS=25;constDEFAULT_DB_IDLE_TIMEOUT_MILLIS=60000;constDEFAULT_DB_CONNECTION_TIMEOUT_MILLIS=2000;constconfig={name:'auth',connector:'postgresql',host:process.env.DB_HOST,port:process.env.DB_PORT,user:process.env.DB_USER,schema:process.env.DB_SCHEMA,password:process.env.DB_PASSWORD,database:process.env.AUTH_DB,};// Observe application's life cycle to disconnect the datasource when// application is stopped. This allows the application to be shut down// gracefully. The `stop()` method is inherited from `juggler.DataSource`.// Learn more at https://loopback.io/doc/en/lb4/Life-cycle.html@lifeCycleObserver('datasource')exportclassAuthDataSourceextendsjuggler.DataSourceimplementsLifeCycleObserver{staticdataSourceName=AuthDbSourceName;staticreadonlydefaultConfig=config;constructor(@inject('datasources.config.auth',{optional:true})dsConfig:object=config,){if(!!+(process.env.ENABLE_DB_CONNECTION_POOLING??0)){constdbPool={max:+(process.env.DB_MAX_CONNECTIONS??DEFAULT_MAX_CONNECTIONS),idleTimeoutMillis:+(process.env.DB_IDLE_TIMEOUT_MILLIS??DEFAULT_DB_IDLE_TIMEOUT_MILLIS),connectionTimeoutMillis:+(process.env.DB_CONNECTION_TIMEOUT_MILLIS??DEFAULT_DB_CONNECTION_TIMEOUT_MILLIS),};dsConfig={...dsConfig,...dbPool};}super(dsConfig);}}