Dienstag, 17. März 2020

Solide HubSpot API Anbindung mit Polly

Moderne Online-Plattformen und E-Commerce Lösungen setzen auf Microservice Architekturen. In solchen Applikationslandschaften ist die Interaktion mit unterschiedlichen Web APIs immer ein zentrales Thema. Auch reguläre Applikationen, sowie Online-Portale interagieren heute meist mit externen Web APIs. Die Vorteile solcher Architekturen liegen auf der Hand, können klein gehaltene Services doch einfach bei Bedarf skaliert und auch ausgetauscht werden.

Der Einsatz von externen Web APIs kann verschiedene Seiteneffekte haben. Manchmal sind externe Systeme z.B. nicht erreichbar oder die Web API Zugriffe werden auf Grund von Überlastungsschutz gedrosselt. Dies hat direkt Auswirkungen auf die Usability und Stabilität der gesamten Applikation oder Plattform.

In diesem Blogpost zeigen wir, wie dank Polly eine Web API Anbindung stabil und robust realisiert werden kann.

Anwendungsfall

Für unser Beispiel verwenden wir eine einfache Applikation, welche von unserem HubSpot CRM alle eingetragenen Kunden- und Firmendaten auslesen und die Beziehungen zwischen diesen beiden Entitäten abfragen soll.

Da die HubSpot CRM Beziehungsdaten nur einzeln abgefragt werden können (z.B. «gib alle Kontakte von Firma X»), generieren wir bei einem Vollexport in kurzer Zeit soviele Anfragen, dass wir schnell an die Grenzen des Rate-Limiting und an das Burst-Limit von HubSpot CRM stossen. Mittels Polly soll nun eine Lösung für den Umgang mit diesen Limits realisiert werden.

HubSpot Rate-Limiting

Da HubSpot die REST APIs selbst auch für die eigene Core-Applikation verwendet, haben sie zum Schutz Request Begrenzungen eingebaut, die ein übermässiges abrufen der API verhindert. So können sie garantieren dass die API immer performant und für alle API Benutzer verfügbar sind. Dazu nutzen sie zwei verschiedene Limits, ein Burst- und ein Daily-Limit. Diese Limits sind abhängig von dem gewählten Produkt-Plan. Siehe auch https://developers.hubspot.com/apps/api_guidelines.

Burst-Limit

Das hier angegebene Limit gilt für schnell nacheinanderfolgende Abfragen und ist auf eine relativ kurze Zeit von 10 Sekunden beschränkt.

Daily-Limit

Hier wird eine Tägliche Obergrenze an Abfragen definiert. Dies sollte normalerweise für normale Applikationen gut ausreichen.

Weitere Informationen zu den Rate Limits und wie man mit diesen umgehen kann ist hier nachzulesen.

Aufbau der Beispiellösung

Für unser Beispiel verwenden wir folgende Technischen Komponenten:

Für Abfragen der HubSpot API verwenden wir RestSharp, es kann aber auch der normale HttpClient von dotnet core verwendet werden.

Aufbau der Beispiellösung

Ablauf der Applikation

Um alle Kunden- und Firmendaten inklusive Beziehungen auszulesen sind nachfolgende Schritte notwendig:

  1. Alle Firmendaten via HubSpot API lesen Hubspot API GetAllCompanies
  2. Alle Kontaktdaten via HubSpot API lesen Hubspot API GetContacts
  3. Alle Beziehungen zwischen den Firmen- und Kontaktdaten via HubSpot API lesen Hubspot API GetAssociations (DefinitionId 2)
  4. Auflösen der Beziehungen zwischen Firmen- und Kontaktdaten

Basis-Implementation ohne Polly

Zuerst müssen für unsere Zwecke die Daten-Models definiert werden, damit wir diese auch von HubSpot aus den JSON Daten konvertieren können. Folgende Models benötigen wir:

public class PropertyValue
{
	public string Value { get; set; }
}
public class CompanyEntity
{
	public long CompanyId { get; set; }
	public Dictionary<string, PropertyValue> Properties { get; set; }
}

public class ContactEntity
{
	public long Vid { get; set; }
	public Dictionary<string, PropertyValue> Properties { get; set; }
}

public class GetAllCompaniesResult
{
	public bool HasMore { get; set; }
	public long Offset { get; set; }

	public IEnumerable<CompanyEntity> Companies { get; set; }
}

public class GetAllContactsResult
{
	public bool HasMore { get; set; }
	public long VidOffset { get; set; }

	public IEnumerable<ContactEntity> Contacts { get; set; }
}
public class HubspotGetAssociationsResult
{
	public bool HasMore { get; set; }
	public long Offset { get; set; }

	public IEnumerable<long> Results { get; set; }
}

Nun können wir direkt auf die Web-API zugreifen und das erhaltene JSON ausgeben. Dazu muss noch ein HubSpot API Key angegeben werden. Hier gibts eine Anleitung, wie man den Api-Key erhält:

static async Task Main(string[] args)
{
	var client = new RestClient("https://api.hubapi.com");
	var request = new RestRequest("/companies/v2/companies/paged");
	request.AddQueryParameter("hapikey", "HUBSPOT-API-KEY");
	request.AddQueryParameter("properties", "name");
	var result = await client.ExecuteTaskAsync<GetAllCompaniesResult>(request);
	
	Console.WriteLine(result.Content);
}

Resultat:

{
  "companies": [
    {
      "portalId": 62515,
      "additionalDomains": [
      ],
      "properties": {
        "name": {
          "sourceId": "name",
          "timestamp": 1464484587592,
          "versions": [
            {
              "name": "name",
              "sourceId": "name",
              "timestamp": 1464484587592,
              "value": "Example Company",
              "source": "BIDEN",
              "sourceVid": [
                
              ]
            }
          ],
          "value": "Example Company",
          "source": "BIDEN"
        }
      },
      "isDeleted": false,
      "companyId": 115200636
    },
    {
      "portalId": 62515,
      "additionalDomains": [
        
      ],
      "properties": {
        "name": {
          "sourceId": "name",
          "timestamp": 1468832771769,
          "versions": [
            {
              "name": "name",
              "sourceId": "name",
              "timestamp": 1468832771769,
              "value": "Test Company",
              "source": "BIDEN",
              "sourceVid": [
                
              ]
            }
          ],
          "value": "Test Company",
          "source": "BIDEN"
        }
      },
      "isDeleted": false,
      "companyId": 115279791
    }
  ],
  "has-more": true,
  "offset": 115279791
}

Wenn dieses Beispiel ausgeführt wird, wird das JSON-Resultat der Companies REST API in der Konsole ausgegeben.

Da nun alle HubSpot Abfragen im Normalfall über mehr als eine Seite ausgegeben sind und wir den Paging-Mechanismus nicht jedesmal neu programmieren wollen, haben wir hierzu eine kleine Helper-Methode geschrieben, welche uns diese Funktionalität abnimmt:

private static async Task<IEnumerable<TListData>> ReadAllData<TData, TListData>(IRestClient client,
	string resource,
	Method method,
	Func<TData, object> offsetSelector,
	string offsetParameterName,
	Func<TData, IEnumerable<TListData>> resultSelector,
	Func<TData, bool> hasMoreSelector,
	Action<RestRequest> requestConfiguration = null)
{
	// Request erstellen
	var request = new RestRequest(resource, method);

	// Authentifizierung hinzufügen
	request.AddQueryParameter("hapikey", "HUBSPOT-API-KEY");

	// Request von aussen weiter konfigurieren lassen
	requestConfiguration?.Invoke(request);

	// Request absetzen und resultate selektieren
	var response = await client.ExecuteTaskAsync<TData>(request);
	var results = new List<TListData>();
	results.AddRange(resultSelector(response.Data));

	// Solange noch mehr Daten vom Server geladen werden können...
	while (hasMoreSelector(response.Data))
	{
		// ... den korrekten Offset selektieren
		var offset = offsetSelector(response.Data);
		// den Offset-Parameter im Request aktualisieren
		request.AddOrUpdateParameter(offsetParameterName, offset, ParameterType.QueryString);

		// Erneut den Request absetzen und die Resultate selektieren
		response = await client.ExecuteTaskAsync<TData>(request);
		results.AddRange(resultSelector(response.Data));
	}

	// Alle geladenen Resultate an den Aufrufer zurückgeben
	return results;
}

Es sieht zwar dank den verwendeten Generics nicht so aus, aber die Funktionsweise ist relativ simpel: Der gleiche Request wird mit aktualisiertem Offset-Parameter solange abgesetzt bis die Schnittstelle zurückmeldet, dass keine weitere Seite mehr vorhanden ist.

Damit sieht unser Aufruf nun so aus:

var result = await ReadAllData<GetAllCompaniesResult, CompanyEntity>(
	client,
	"/companies/v2/companies/paged",
	Method.GET, 
	d => d.Offset, 
	"offset",
	r => r.Companies,
	r => r.HasMore,
	req => req.AddQueryParameter("properties", "name"));

Dasselbe können wir nun für die Kontakt-Daten implementieren:

var allContacts = await ReadAllData<GetAllContactsResult, ContactEntity>(
	client,
	"/contacts/v1/lists/all/contacts/all",
	Method.GET,
	d => d.VidOffset,
	"vidOffset",
	r => r.Contacts,
	r => r.HasMore,
	req => req.AddQueryParameter("property", "firstname").AddQueryParameter("property", "lastname"));

Jetzt haben wir alle Kontakt- und Firmendaten vom HubSpot API geladen. Jetzt müssen wir für jede Firma die entsprechenden Kontakt-Id's laden. Da diese Relationen nur pro Entität einzeln geladen werden können, werden wir so viele Aufrufe tätigen müssen wie wir Firma-Einträge geladen haben.

Da dies eine grössere Anzahl an Requests zur Folge hat, werden wir dies parallel ausführen, damit wir die Resultate etwas schneller erhalten. Dazu haben wir die Helper-Methode RunWithMaxDegreeOfConcurrency geschrieben, welche uns die Aufteilung und das parallele Abarbeiten der Einträge vereinfacht.

var mappedContacts = new ConcurrentBag<Tuple<CompanyEntity, IEnumerable<ContactEntity>>>();
await RunWithMaxDegreeOfConcurrency(20, allCompanies, async company =>
{
	var mappings = (await ReadAllData<HubspotGetAssociationsResult, long>(
		client,
		${body}quot;/crm-associations/v1/associations/{company.CompanyId}/HUBSPOT_DEFINED/2",
		Method.GET,
		d => d.Offset,
		"offset",
		r => r.Results,
		r => r.HasMore)).ToArray();

	var contacts = allContacts.Where(c => mappings.Contains(c.Vid)).ToList();
	mappedContacts.Add(new Tuple<CompanyEntity, IEnumerable<ContactEntity>>(company, contacts));
});

Zum Schluss schreiben wir die Kontakte in die Konsole, damit wir auch sehen was selektiert wurde:

var companyNameAndContacts = mappedContacts.Select(c => ${body}quot;{c.Item1.Properties["name"].Value}: {string.Join(", ", c.Item2.Where(r => r.Properties.ContainsKey("firstname")).Select(r => r.Properties["firstname"].Value))}\r\n");
Console.WriteLine(string.Concat(companyNameAndContacts));

Resultat:

Wenn wir nun dieses Beispiel ausführen, erhalten wir nach kurzer Zeit von der HubSpot API Antworten mit dem StatusCode 429 (TooManyRequests). Wir haben also bereits das Rate-Limit erreicht.

Unhandled exception. System.AggregateException: One or more errors occurred. (Error loading request https://api.hubapi.com/crm-associations/v1/associations/1234567/HUBSPOT_DEFINED/2?hapikey=HUBSPOT-API-KEY: (Status Code: TooManyRequests))

Wie kann uns Polly helfen?

Diese Bibliothek stellt einfache Funktionen zur Verfügung, um Ausnahmen und Fehler mittels Regeln zu behandeln und den aufgerufenen Code ausfallsicherer zu implementieren.

Im Prinzip wird definiert, in welchen Fällen welche Aktion ausgeführt werden soll. Die Fälle die Auftreten, können normale Exceptions sein, es kann aber auch der Rückgabewert einer Methode geprüft werden. Die Aktionen welche ausgeführt werden sollen können z.B. Retry (erneuter Aufruf), Cache (ältere, gespeicherte Werte zurückgeben) oder Fallback (anderer Code-Pfad oder Statischer Wert) sein. Nachfolgend eine kurze Auflistung und Erklärung der möglichen Polly Policies:

  • Retry (Automatisch erneute Aufrufe starten nach einer definierten Wartezeit)
  • Circuit-breaker (Blockiert die Ausführung des unterliegenden Codes für eine gewisse Zeit, damit z.B. eine Schnittstelle nicht übermässig aufgerufen wird)
  • Timeout (Wartet nur eine begrenzte Zeit auf eine Antwort des unterliegenden Codes)
  • Bulkhead Isolation (Limitiert die Anzahl Aufrufe des unterliegenden Codes und arbeitet alle aufrufe nacheinander ab mit einer maximalen «Parallelität)
  • Cache (Liefert im Fehlerfall einen Wert aus dem Cache. Speichert und aktualisiert auch automatisch den Cache bei einem erfolgreichen Resultat)
  • Fallback (Definiert einen alternativen Rückgabewert oder die Ausführung eines alternativen Codes bei einem Fehler)
  • PolicyWrap (Kombiniert mehrere definierte Policies zu einer Policy)

Weitere Informationen zu den jeweiligen Policies befinden sich auf der Polly Github Seite

Erweiterung der Basis-Implementation mit Polly

Nun kommt Polly ins Spiel. Wir wissen, dass die HubSpot API ein Rate-Limit verwendet und das Burst-Limit in einem Zeitrahmen von 10 Sekunden arbeitet. Somit können wir versuchen, innerhalb der 10 Sekunden ein Retry zu versuchen. Falls dies nicht klappt, sollten wir abbrechen, um die Schnittstelle nicht länger zu "belästigen". Ausserdem müssen wir, um herauszufinden ob ein Aufruf fehlerhaft war, den StatusCode der IRestResponse prüfen.

In Polly definieren wir dazu eine Policy. Dabei definieren wir, dass die Policy nur greifen soll, wenn der StatusCode 429 (TooManyRequests) entspricht. Wenn dies der Fall ist, soll maximal 5 mal ein WaitAndRetry ausgeführt werden mit einer Wartezeit von jeweils <versuch-nummer>*2 Sekunden. So wird der erste Fehler 2 Sekunden warten, der zweite 4 usw. bis der 5. und letzte Versuch maximal 10 Sekunden wartet.

So sieht diese Policy aus:

Policy
	.HandleResult<IRestResponse<TData>>(d => d.StatusCode == HttpStatusCode.TooManyRequests)
	.WaitAndRetryAsync(5, x => TimeSpan.FromSeconds(x * 2), (response, wait) => Console.WriteLine(${body}quot;WaitAndRetry policy waiting for {wait.TotalSeconds}s."))

Um nun diese Policy beim Aufruf des Rest-Endpoints zu verwenden, kann die konfigurierte Policy mittels ExecuteAndCaptureAsync ausgeführt werden. Innerhalb dieser Methode kann als Aktion die Abfrage der REST-Schnittstelle implementiert werden.

Damit dies etwas einfacher zu handhaben ist, haben wir auch hier eine kleine Helper-Methode geschrieben:

private static async Task<IRestResponse<T>> PollyWrapClientExecute<T>(IRestClient client, IRestRequest request, IAsyncPolicy<IRestResponse<T>> policy)
{
	var policyResult = await policy.ExecuteAndCaptureAsync(() => client.ExecuteTaskAsync<T>(request));

	return (policyResult.Outcome == OutcomeType.Successful) ? policyResult.Result : new RestResponse<T>
	{
		Request = request,
		ErrorException = policyResult.FinalException
	};
}

Diese führt die Policy aus und liefert im Fehlerfall eine RestResponse mit einer ErrorException zurück.

Eingebaut in unsere ReadAllData Methode wird dies nun in Etwa so aussehen:

private static async Task<IEnumerable<TListData>> ReadAllData<TData, TListData>(IRestClient client,
	string resource,
	Method method,
	Func<TData, object> offsetSelector,
	string offsetParameterName,
	Func<TData, IEnumerable<TListData>> resultSelector,
	Func<TData, bool> hasMoreSelector,
	Action<RestRequest> requestConfiguration = null)
{
	// Polly Resilience Policy konfigurieren
	var policy = Policy
		.HandleResult<IRestResponse<TData>>(d => d.StatusCode == HttpStatusCode.TooManyRequests)
		.WaitAndRetryAsync(5, x => TimeSpan.FromSeconds(x * 2), (response, wait) => Console.WriteLine(${body}quot;WaitAndRetry policy waiting for {wait.TotalSeconds}s."));

	// Request erstellen
	var request = new RestRequest(resource, method);

	// Authentifizierung hinzufügen
	request.AddQueryParameter("hapikey", "24470fef-b167-4a89-9452-23d064b4f222");

	// Request von aussen weiter konfigurieren lassen
	requestConfiguration?.Invoke(request);

	// Request absetzen und resultate selektieren
	// Erweitert mit Polly Resilience-Policy
	var response = await PollyWrapClientExecute(client, request, policy);

	if (!response.IsSuccessful)
	{
		throw new Exception(${body}quot;Error loading request {client.BuildUri(request)}: {response.ErrorMessage} (Status Code: {response.StatusCode})");
	}
	
	var results = new List<TListData>();
	results.AddRange(resultSelector(response.Data));

	// Solange noch mehr Daten vom Server geladen werden können...
	while (hasMoreSelector(response.Data))
	{
		// ... den korrekten Offset selektieren
		var offset = offsetSelector(response.Data);
		// den Offset-Parameter im Request aktualisieren
		request.AddOrUpdateParameter(offsetParameterName, offset, ParameterType.QueryString);

		// Erneut den Request absetzen und die Resultate selektieren
		// Erweitert mit Polly Resilience-Policy
		response = await PollyWrapClientExecute(client, request, policy);
		results.AddRange(resultSelector(response.Data));
	}

	// Alle geladenen Resultate an den Aufrufer zurückgeben
	return results;
}

Resultat:

WaitAndRetry policy waiting for 2s.
WaitAndRetry policy waiting for 4s.
WaitAndRetry policy waiting for 6s.
...
ExampleCompany AG: John Doe, Jane Mueller
TestCompany AG: Fritz Frei, Doris Test
...

Wenn wir nun unser Programm erneut ausführen, werden mehrere Male Meldungen ausgegeben, dass die WaitAndRetry Policy gegriffen hat. Jedoch nach kurzer Zeit können alle geforderten Daten abgefragt werden und unsere Aufgabe kann ohne Probleme gelöst werden.

Fazit

Dank Polly lässt sich mit einfachen Mitteln eine Web API Abfrage stabilisieren, ohne dass man sich selbst um das Handling von anspruchsvollen Schnittstellen-Themen, wie z.B. Rate-Limits oder Burst-Limits kümmern muss.

Hervorzuheben ist die Flexibilität, welche Polly bietet. Die Resilience Funktionalität kann für jegliche Art von Code verwendet werden, da eine Policy-Ausführung primär ein Wrapper um einen bestimmten Code-Block ist.

Mit den integrierten Policies ist man für alle erdenklichen Fälle gerüstet, die man bei der Anbindung von Web API typischerweise antrifft.

Weiterführende Links

Autor

Christoph Keller