Latest Replies
Tuesday
May222012

DDD/CQRS Challenge - Integrating Distributed Systems

Let's have a look at the relatively simple DDD/CQRS challenge in integrating elements of a distributed system composed of a different bounded contexts and deployed across different hosting environments.

Let's imagine a small Software-as-a-Service company which provides some subscription-based service while charging customers per consumption on pay-as-you-go basis. Software infrastructure of such company could consist of only 3 bounded contexts (a major oversimplification on my part, bigger view might be more complicated):

  • Subscriptions - subscription management system, that keeps track of all customers, their active plans, billing information, invoices, monthly service consumption and available login keys. This system is architected as NoSQL solution with event sourcing and is deployed on a dedicated server (with plans to redeploy it to Azure some time later).

  • Cloud Services Integration - massively scalable set of services deployed in Windows Azure (e.g. using some big data processing design). Among the other things, these services expose API to 3rd party companies and even products of the same company. This API is secured by user tokens, which are replicated from the subscriptions BC. This project is stable and does not change frequently.

  • Product 1 - a new product being delivered by the company. It is developed as a standalone set of systems that enhance user experience, using Cloud API. This product leverages authentication and user management capabilities from "Subscriptions" and interoperates with API.

Here are some examples of the interactions between these system:

  • If new user is added to the subscriptions, it's auth credentials should be immediately (within 1-2 seconds) replicated to Cloud Services, to enable access via API.
  • If customer's account is locked out due to balance overdraft, then all related users should be locked out of the API.
  • When services consumption is detected in the API, it should be within 5 minutes reported to subscriptions portal.

Naturally all these systems have to work independently in such way, that if one of these is down, the rest will continue doing their part (at the very least by providing read-only UI, at best - doing everything that is not dependent on the other systems).

For example, if subscriptions are down for maintenance or Cloud Services and Product 1 should continue working as they were (all pending changes should be replicated after system comes back online).

Additional constraints:

  • Resulting design (with inherent implications) should be relatively easy to explain to a Junior dev.
  • It should also be relatively straightforward to deploy and run systems both locally (xcopy deployment of .NET code) and in the cloud.
  • systems should be able to change independently and rapidly as they follow their individual DDD evolution paths (for example, weekly releases with new business processes but without breaking any relations).
  • no more than 3 people per project to develop and maintain it.

Note, that we are focusing here only on the integration between the systems. Internal design of each system might affect such integration, but is less relevant in this case. Still it would be nice, if integration patterns shared natural affinity with internal design of each bounded context (this tends to create systems that are more robust and practical).

How would you approach this problem?

Sunday
May202012

DDD Summit 2012 Summary - #DDDesign

Some time ago I was honored (probably because of some sheer mistake) to be invited to DDD Summit 2012 organized by Eric Evans. The event took place in Portland over the course of last week and it was an absolute blast for me.

From left to right:

  • Cameron Purdy - VP of development at Oracle [Blog, twitter]
  • Daniel Gackle - CoFounder at SkySheet
  • Randy Stafford - A-Team at Oracle [About]
  • Jimmy Nilsson - Factor10, Author of Applying DDD Design and Patterns [Blog, Twitter]
  • Rebecca Wirfs-Brock - Wirfs-Brock and Responsibility-Driven Design [Wiki]
  • Vaughn Vernon - Consultant at ShiftMethod, Author of the next DDD Book [Blog, Twitter]
  • Andreas Brink - Software developer & coach at Factor10 [Blog]
  • Alberto Brandolini - Avanscoperta, DDD Instructor and master of tomato [Blog, Twitter]
  • Eric Evans - Father of DDD and founder of Domain Language [Twitter]
  • Dan Bergh Johnsson - consultant and partner at OmegaPoint [Blog, Twitter]
  • Paul Rayner - DDD Instructor at Domain Language [Twitter, Blog]
  • Martin Fowler - needs no introductions [Wiki, Twitter]
  • Patrik Fredriksson - Consultant at Citerus AB, DDD Instructor [Blog, Twitter]
  • Rinat Abdullin - accidental visitor, Tech Leader of Lokad

In essence, we were discussing various aspects of DDD in small focus groups and all together, going out on hikes and dinners, talking, talking and talking. There was a wide variety of topics covered during these three days.

For me, this was an amazing opportunity to dive deeper into DDD nuances, practical experience and ideas shared by the leading thought leaders.

Outcomes and Commitments

Long story made short, there are some outcomes of this event that should benefit of the community (including some of my personal commitments).

There is going to be more activity on the DDDCommunity.org site, that might have felt a little neglected over the course of last year (with a lot of good materials being hidden by accident). Things will start improving within the next few weeks (more regular updates and more structured community).

I personally hope to contribute some small practical experience bits of community building from projects and groups on DDD/CQRS (i.e. CQRSGuide and CQRS Beers with awesome communities around EU) , Lokad projects and Distributed Podcast, maybe even bringing all these closer together.

Practical samples are a big thing for demonstrating DDD. We plan to bring them together on a github and refresh a little bit (may be even adding Scala/Akka implementation!).

My own Lokad.CQRS Sample Project is essentially a sample of DDD (with all the latest tech stack, cloud portability, practical things and even some DDD modeling toolset) that currently lacks the most important thing - proper domain model. I plan to fix this last problem by the mid of June, in addition to committing to things I've been delaying far too long (i.e.: articles and videos about the sample project).

More articles and materials about the Domain-Driven Design are expected be published; some of the topics were already outlined by the summit participants. There are plans to establish better environment for reviewing and shepherding such materials, which should definitely help.

I personally plan to continue writing about various aspects of DDD/CQRS+ES. Hopefully these new articles will start making slightly more sense due to focus not only on disposable technical details (which CQRS+ES is, despite all its coolness and cloud-portability), but also on strategic and design decisions governed by DDD. Besides, some more peer pressure is expected on them :)

There are even more cool long-term plans about pushing DDD and state of the art in the community, however they are long-term. Meanwhile, you can stay tuned for the updates via #DDDesign (I'll make sure that any official DDDCommunity news will be auto-published on twitter via this tag as well).

How does this look?

Tuesday
May082012

Dark Visual Studio Theme

This is for those of you who were asking about the color scheme used in my Visual Studio and console.

Here's the DarkColors.zip with explanations below.

For text editor I'm using dark version of amazing color scheme Solarized by Ethan Schoonover, which was slightly adapted to play nicely with R# formatting (see Exported-Fonts-and-colors.vssettings).

Tabs and window elements in Visual Studio were adjusted by Theme Manager Package for Visual Studio. I used theme derived from Elite Dark. My windows console actually uses Console2 manager hosting bash. Color settings are stored in xml files, I provided mine for the reference in Settings-for-console2.xml

I find these color settings to less constraining for the eyes in various lighting conditions.

Saturday
May052012

Move Forward by Discarding Complex Tech

Good things are either well-forgotten past or a complete rip-off from the nature. It seems that at Lokad we are going all the way back in time ourselves as well.

Over the course of the last few days we had really interesting times at Ufa office, while migrating entire event replication infrastructure to a new model. If you wish, you can call this infrastructure as bounded context of digital nervous system that is represented by green arrows in our context maps. This is a really interesting place for us, since it "touches" multiple other bounded contexts and actually crosses 2 clouds and 1 additional datacenter deployment-wise. Change shocks are mesmerizing to observe.

Now, instead of a mixture of Azure queue delivery and ZeroMQ streaming, our applications just push large event streams over hand-made HTTP replication protocol. This effectively uses HttpListener and WebRequests, which are:

  • rather performant;
  • dead-simple and well understood;
  • have minimal friction of introducing replication to new projects (ZeroMQ is pretty invasive here, if you go for Azure);
  • can be debugged with a lot of HTTP-based tools.

The design is rather simple, practical and works well for streams of half a million of events (albeit performance could be improved a lot). This was really important, since we have now a number of bounded contexts to integrate together and the volume of event streams just keeps on growing.

It is curious, how our movement forward towards better and simpler designs happens concurrently with stepping back from complex technologies to much simpler ones. In other words, we gain by discarding things.

Another example of such behavior is related to our recent decision to discard ProtoBuf as the storage format for large data objects, while replacing ProtoBuf+Gzip with TSV+Gzip. This applies specifically to bounded contexts that deal with big data. Reasons for that being:

  • ProtoBuf by default loads all objects directly into the memory at once (imagine a dataset of 1 GB), while the default behavior of text files is streaming;
  • For numerical data TSV+Gzip compresses better than ProtoBuf+Gzip, since archivers were initially designed and optimized specifically for handling text data;
  • You can read and parse TSV dataset with tools on any platform, including scripts and Excel. While with protobuf, some intermediate dancing would be required.

So, if I can reduce a number of technologies in a given bounded context, while making it more practical and performant, then that's a clear choice.

As you can see, in certain scenarios, we are stepping back from cool and smart tech towards something more practical and simple. This "stepping back" actually enables us to solve certain problems that exist in this specific scenario. Surprisingly enough, this brings us closer to the Unix philosophy:

Write programs that do one thing and do it well. Write programs to work together. Write programs to handle text streams, because that is a universal interface.

I certainly didn't expect to see this happening before, not even theoretically. However, in practice, there is a big difference between theory and practice.

Caveats

Please, keep in mind, that:

  • we are aware of ProtoBuf capability to read items sequentially.
  • we still will be using ProtoBuf for serializing messages, including events that are used for our event sourcing scenarios (leveraging for .NET development a wonderful library by Marc Gravell)
  • these examples just serve the purpose of illustrating possibility of cases, where you can move forward by discarding a technology. Specific decisions might not be applicable directly to your case.
Wednesday
May022012

Processing Big Data in Cloud à la Lokad

Let's talk about a simple approach to visualise, model and deliver complex large-scale data processing tasks. Such tasks would deal with datasets that are so large, that they don't fit into the memory of a single machine and would also take ages to compute on a single machine. These datasets can often be referred to as "BigData".

Such tasks would benefit from distributing out the work and storage load between relatively cheap machine instances that are made available in the cloud (either public or "private"). We would also want to optimize our consumption costs by get these resources only when they are needed and releasing afterwards.

Let's also assume that such processing task, requires complex sequence of steps in order to complete (more complex than a mere MapReduce). Some steps must be processed before others can start, while others can work in parallel batches. Actual steps of the job are idempotent, messages can be delivered more than once or simply fail.

Here is an example of how such processing graph could look like:

That's how I would approach such problem in the situation, when:

  • development resources are limited;
  • data processing model is not formally established and is likely to evolve;
  • team (or a single developer) is familiar with event sourcing.

I would split the problem domain into two separate bounded contexts: Orchestration and Data Processing.

Bounded Contexts

Orchestration Bounded Context will be responsible for navigating data processing graph and orchestrating the individual jobs. Behaviors for that will be captured inside an aggregate root that uses event sourcing for persistence (AR+ES) for better testing and getting persistence mismatch troubles out of the way. Deployment-wise, this aggregate can live in a separate machine and would be configured in such a way, that all commands to this aggregate are synchronized and executed on a single thread (just a routing rule for messages).

AR+ES just schedules batches of tasks that can be executed an parallel, and issues second batch only when the first one is complete.

Should there be any message duplication (always a possibility in the cloud environments), AR+ES can easily track and drop duplicates by keeping hashes of already completed task batch identifiers.

Data Processing Bounded Context will be implemented using a set of command handlers that consume work commands from an input queue and process them. These handlers would operate upon data that is stored somewhere in the cloud and is considered to be immutable for the duration of the specific big data process. Commands and events can contain meta-data, parameters and references to this immutable data.

In essence, command handler is a function that will take as input a command (which could contain a reference to larde immutable data blob), perform certain operations and publish an event (optionally saving some large processing data into another data blob).

Multiple instance of command handlers would be picking commands from the input queue in this bounded context. In essence, they would be competing for the jobs, just like clercs in the bank "compete" for customers standing in line (customer is handled by only one clerk). However, we would be better than a bank, since if we can always massively increase the number of command handlers handling the load, by instructing cloud fabric to provision more machines.

Both bounded contexts subscribe to all important domain events of each other.

Important: we should differentiate between actual data (which is so large that it does not fit into a single machine/process) and behavioral metadata. The former is accessed only by data processing bounded context, while the latter is passed within the messages between both bounded contexts. For example, number of time series in dataset is metadata, while actual values within these time series are actual 'raw' data.

Orchestration Aggregate would use that metadata to make decisions that 'drive' the process through the graph.

Flow of work

Let's say we have a ProcessAggregate that contains orchestration logic for our complicated MapReduce process. When this aggregate starts, it simply publishes X events that say something

TaskAScheduledEvent(processId = 1, taskID = guid)

Note: there is more elegant way to do this, but that would require going deeper into DDD

These events would be received by Receptor (or Port) in the second bounded context, which would translate them into instances of ProcessTaskACommand. These command messages would be passed into the queue from which multiple worker machines pick their jobs.

When command handler finishes processing the task it sends TaskAProcessedEvent, which will get routed back to the ProcessAggregate as ConfirmTaskAResults(task ID = guid)

Within the aggregate we:

  • Mark task as confirmed (if it hasn't already been reported due to message duplication).
  • If this task completes some batch and enables further processing, we schedule more tasks for cloud execution.

We can also define a timeout view that simply lists all tasks that are currently running. Timeout manager (a simple process) regularly checks this view and sends "TryTimeoutTaskX" to the aggregate. Aggregate checks with it's internal state, and if task indeed has not been processed, decides to either reissue the task or terminate the whole process (yes, we essentially implement our timeout tracking as a business process within Lokad.CQRS architecture style)

Gotchas

Advantages of this approach (esp. if aligned with Lokad.CQRS architecture style):

  • no need to worry about persistence of complex object that represents our graph decision logic;
  • orchestration logic can be explicitly tested with the specifications (and documented as such);
  • we can easily migrate between multiple versions of the data process without downtime or stopping processes;
  • process can easily be developed and debugged on the local machine, while being deployed to any cloud afterwards;
  • we use same approaches and ideas that are used within Lokad.CQRS architecture style for modeling more conventional business concepts (this lowers learning barrier and allows to reuse answers to some common problems).

Drawbacks of this approach are:

  • It requires certain development discipline (and familiarity with cloud computing and AR+ES);
  • At the moment of writing, there is no prepackaged infrastructure for event sourcing that would work out-of-the-box;
  • performance of this approach would be somewhat inferior to finely tuned functional style map-reduce process implementation;
  • this is a batch-processing approach, which is not fit for real-time processing (yet).

In short, with this approach we trade some performance for development and deployment flexibility. This enables us to rapidly model and implement big data process (especially when requirements are still changing). After the process is formalized, we can always fine-tune and optimize bottlenecks. Although frequently you would find that it is cheaper to add another server (worth 100 EUR per month) than waste multiple development days of brilliant developers on performance optimizations.

Heads up: the entire infrastructure does not need to be really performant with one exception - if you are doing hundreds of thousands of messages within a single process, then it's worth to invest effort in messaging infrastructure (e.g. direct communication with ZeroMQ), otherwise latency will kill everything. Event stream for actual process aggregate can be simply cached in memory.

Deployment Options

Below are some deployment variations that could be used within this approach. We can implement our core processing logic without any coupling to specific deployment environment and then deploy in various configurations. The latter would require just re-configuration and optionally providing some specific adapters implementations (for messaging, event sourcing and large BLOB streaming).

Alternatively you can have the same project prepared for multiple deployment options from the start.

  • Local development machine:

    • orchestration bounded context runs as one thread;
    • multiple data processing command handlers run either as parallel threads or as multiple instances of a single console app;
    • file system is used for both message queueing, persistence of large binary files and event streams for aggregates.
  • Windows Azure Cloud:

    • Orchestration bounded context runs in a single worker role (e.g. instance of Lokad.CQRS-based engine);
    • data processing handlers run as additional Windows Azure worker roles (you can configure them to run on X different threads within Y worker role instances);
    • Large data is streamed to Azure Blob Storage, just like event streams for AR+ES entities;
    • Azure queues are used for messaging.
  • Amazon Elastic Compute:

    • Orchestration bounded context is a single VM, while Data processing command handlers run within multiple replicas of another VM. We scale by adding or dropping instances of that second VM.
    • Amazon S3 storage is used for persisting large binary data, while local instance of RabbitMQ is used for messaging; event streams could be persisted locally within orchestration VM.

Obviously, these are just some of the few options. You can have completely different scenario, based on the specific resources, requirements, risks and constraints within you project.

In each of these cases, elastic scaling can be done by implementing a simple task that would watch upon the amount of messages waiting in the command queue of data processing bounded context, adjusting number of command handler instances accordingly.

Final Words

This approach is in not a silver bullet. It just summarizes some limited experience gained while developing and maintaining non-realtime big data processes that could be hosted both in the cloud and on-premises. As such, it can have numerous applicability limitations (especially if you are working within constrained enterprise environment). Some alternative approaches and references worth mentioning are available in the reading list on Big Data.

However, if you need to quickly deliver out some scalable multi-step data process with one person, no money for expensive software licenses and just a few weeks of time, then this approach might give you some ideas.

If you want to read more along these lines, here are a few more relevant posts:

Thursday
Apr262012

Continuous Delivery with Lokad.CQRS

Today I was presenting at the Roots about Lokad.CQRS architecture style. As part of the presentation I created a quick deployment of Lokad sample project, tweaked for a continuous delivery scenario.

In short, every time a change is pushed to master within this repository: Lokad.CQRS-AppHarbor, AppHarbor performs a new full deployment of this site: http://lokad-cqrs.apphb.com. It is the duty of Lokad.CQRS to rebuild any projections that might have changed in the code.

For the most fun: this deployment is actually about 2 clouds. AppHarbor (located within Amazon) deals with builds and hosting, while data is persisted on Azure. I just didn't have time to write adapters for the persistence available within the AppHarbor (the entire deployment thing was delivered within few litres of coffee and 6-8 man hours of work).

Naturally, while working with the system within local development environment, persistence is based on dead-simple file storage.

Currently this repository is an off-shot of latest Lokad.CQRS, created for the demo purposes.

However, this experience enables totally new development approach by massively slashing down development friction. I'm tempted to push it to the master of Load.CQRS and keep it hooked with this continuous deployment, while maybe even adding more domain "meat" around inventory management concept. I think, some fun could be had around the process, where every accepted pull request is immediately deployed to production.

Saturday
Apr212012

DDD: Evolving Business Processes a la Lokad

As you already know, there are multiple ways to express any given core business concept in the code via domain modeling (we discussed this topic in previous article). These ways usually depend on the architecture style selected for the bounded context, in which we are currently working.

For now, let's focus on one of such domain concepts: long-running business processes.

In a cloud-based SaaS company, we could have following business processes (among many other):

  • if invoice has a non-zero amount and was not paid within 15 days, then send customer a reminder.
  • if customer balance stays below -5EUR for more than 30 days, then issue a lockdown.
  • if distributed computing process has not finished processing all data batches within 1 hour, then restart it once (except cases, when it was already restarted - then issue a termination alert)

As you probably already noticed, these examples share a few similarities:

  • they are aware of the passing of time and deal with it;
  • these processes express rather complex precondition that is based on current state of the system and leads to one or more then outcomes.

Let's assume that we are dealing with a distributed system, where information about current state is shared with events. In such case, our business process might resemble a piece from complex event processing and would look this from the logical perspective:

How can we implement this "Business Process" box? There are multiple alternatives, depending on the architecture style you have chosen.

For example, you can use a state machine, where each instance of state machine would correspond to a specific process instance that you are tracking. Events would then be used to navigate an instance of the state machine across the nodes. It will also use external timer service to send messages "to future" (where message is put on hold till certain time comes).

State machines are good for formalized domains. You can learn more about such approaches in the materials provided by Gregory Young and Udi Dahan.

However, when we are dealing with business processes, that are rich with fuzzy logic, uncertainty and also happen to evolve rapidly, then a more simple solution might be needed. Especially, when you have almost no development time to spare.

What is the most simple solution in case with locking customer balance for overdrafts? For instance, we can project all events to a view, which will track all active customers that used our services and went below the threshold at some point. Then our execution will be responsible for regularly checking this view and sending "Lockdown" to every customer that had his balance below the threshold for too long.

This component would also need to keep in mind that certain customers require special handling and investigation before being locked out, while others can be locked right away. Naturally, these rules will be changing really often.

What is the fastest and most flexible way to implement such component in a rapidly growing and changing environment?

You simply wire view to the UI, attach a button to send "lockdown command" and ask a person from the business department to spend half an hour per week processing all late customers. This will save dev department hours on implementing these complex execution rules, testing them and then changing (as business discovers new corner cases). Essentially we let the rules evolve and change in the environment that shapes them: in the minds of business managers.

In other words, at this point we avoid large development effort with a little bit of human time.

Please, keep in mind, that once business processes are established and we have so many cases, that manually processing them takes too much time (that should be a profitable company by then), we can always rewrite these lockdown rules as a continuously running server-side task (rules would be mostly established by then). We could still keep the projection and a corresponding view.

At this point we invest a fixed amount of development to automate a large portion of manual work.

This gradual evolution of business processes is currently the recommended approach within Lokad.CQRS architecture style for delivery of non-formalized and rapidly changing business rules.