.NET 直连 Langfuse 发送 Trace: API 教程 (告别 Python)
2025-05-06 23:29:07
用 .NET 代码直接创建 Langfuse Trace:告别 Python 依赖
哥们,你是不是也遇到了类似的情况:手头有个 .NET 和 Python 混合的 AI 项目,Python 部分主要负责跟 OpenAI 和 Langfuse 打交道,特别是往 Langfuse 写 Trace。但为了简化部署、让整个架构更清爽,就想把 Python 那套干掉,直接用 .NET 来处理。结果发现 Langfuse 的官方 API 文档 (https://api.reference.langfuse.com/) 对于直接创建 Trace 的具体端点和报文格式说得不那么透彻,让人有点摸不着头脑。别急,这事儿有解!
一、 问题在哪?
直接点说,问题在于 Langfuse 的文档虽然全面,但对于如何从零开始、不用官方 SDK(比如 Python 或 JavaScript SDK)而是直接通过 HTTP 请求创建包含 Trace、Span、Generation 等组合结构的观测数据,得不够直观。你可能找到了 /api/ingestion
这个端点,但具体怎么构造那个 JSON 报文,特别是各个对象(Trace, Span, Generation)之间的关联,以及需要哪些最小字段集,就得费点劲琢磨了。
二、 追根溯源:Langfuse API 调用机制
Langfuse 的 Python SDK 底层其实也是在拼 HTTP 请求。所以,要搞清楚怎么直接调 API,咱们可以反向思考一下:SDK 是怎么做的?
它通常会收集一系列“事件”(Events),然后一次性批量发送到 /api/ingestion
端点。这些事件包括创建 Trace ( trace-create
)、创建 Span ( span-create
)、创建 Generation ( generation-create
) 等。关键在于这个批量发送的报文结构和各个事件对象内部的字段。
三、 解决方案:.NET 直连 Langfuse API
废话不多说,直接上干货。咱们的目标是用 .NET 的 HttpClient
来模拟 SDK 的行为,直接把观测数据推给 Langfuse。
1. 核心端点和认证
- 端点 (Endpoint):
POST /api/ingestion
- 认证 (Authentication): HTTP Basic Authentication。你需要 Langfuse 项目的 API Public Key 和 Secret Key。将它们组合成
publicKey:secretKey
,然后进行 Base64 编码,放到 HTTP 请求的Authorization
Header 里,格式为Basic <base64_encoded_credentials>
。
2. 关键数据结构 (Payload)
发送到 /api/ingestion
的报文是一个 JSON 对象,其核心是一个名为 batch
的数组,数组里包含了各种事件对象。咱们主要关注这三种:Trace, Span, Generation。
Trace 对象 (trace-create
):
代表一次完整的追踪。
id
(string, UUID): Trace 的唯一标识。你可以自己生成,或者让 Langfuse 生成(但为了后续关联 Span/Generation,建议客户端生成)。timestamp
(string, ISO 8601): Trace 创建的时间戳。name
(string, optional): Trace 的名称,方便识别。userId
(string, optional): 关联的用户 ID。metadata
(object, optional): 自定义元数据。tags
(array of strings, optional): 标签。release
(string, optional): 版本号。version
(string, optional): 应用程序版本。
// TraceCreateEvent 示例(在 batch 数组中)
{
"id": "your-generated-trace-uuid", // 事件本身的 ID,非 Trace ID
"type": "trace-create",
"timestamp": "2024-05-21T10:00:00.000Z",
"body": {
"id": "trace-id-123", // 这是 Trace 的 ID
"name": "my-dotnet-trace",
"userId": "user-x",
"metadata": { "environment": "production" },
"tags": ["dotnet", "direct-api"]
}
}
注意:/api/ingestion
接口实际接收的是一个包含 type
和 body
的顶层事件对象,body
内部才是 Trace、Span 或 Generation 自身的属性。为了简化,后续代码示例中,我们会构造 Trace
, Span
, Generation
的数据模型,并在发送前包装成这种事件格式。Langfuse SDK 实际上也是将多个事件(比如 Trace Create, Span Create, Span Update, Generation Create, Generation Update 等)打包成一个 batch
列表进行发送。
为了方便理解和代码实现,咱们直接构造一个包含所有要创建和更新的对象列表,发送给 Langfuse,它能智能处理。
以下是Langfuse期待的,经过Python SDK等验证的简化数据模型,在 /api/ingestion
中直接作为batch
数组的元素使用:
Trace 数据模型 (Trace
):
id
(string, UUID): Trace 的唯一 ID。客户端生成此 ID 。name
(string, optional): Trace 名称。userId
(string, optional): 用户 ID。metadata
(object, optional): 附加信息。tags
(array of strings, optional): 标签。release
(string, optional): Release 版本。version
(string, optional): App 版本。sessionId
(string, optional): 会话 ID。public
(boolean, optional): 是否公开。
Observation 数据模型基类 (Span 和 Generation 都算作 Observation):
id
(string, UUID): Observation 的唯一 ID。客户端生成此 ID 。traceId
(string, UUID): 此 Observation 所属的 Trace ID。必须与 Trace 的 ID 一致 。type
(string):"SPAN"
或"GENERATION"
。name
(string, optional): Observation 名称。startTime
(string, ISO 8601): 开始时间。endTime
(string, ISO 8601, optional): 结束时间。如果提供,Langfuse 会计算耗时。parentObservationId
(string, UUID, optional): 父 Observation 的 ID,用于构建层级关系。metadata
(object, optional): 附加信息。version
(string, optional): 版本。input
(any, optional): 输入数据 (例如,Span 的输入参数, Generation 的 prompt)。output
(any, optional): 输出数据 (例如,Span 的返回值, Generation 的 completion)。level
(string, optional):"DEBUG"
,"INFO"
,"WARNING"
,"ERROR"
,"DEFAULT"
。statusMessage
(string, optional): 状态消息,通常在出错时使用。
Span 数据模型 (Span
- 继承自 Observation):
代表一个操作单元或步骤。
- (继承所有 Observation 字段)
type
: 固定为"SPAN"
。
Generation 数据模型 (Generation
- 继承自 Observation):
代表一次模型(如 LLM)的调用和生成。
- (继承所有 Observation 字段)
type
: 固定为"GENERATION"
。model
(string, optional): 使用的模型名称 (例如gpt-3.5-turbo
)。modelParameters
(object, optional): 模型参数 (例如{"temperature": 0.7}
)。prompt
(any, optional): Deprecated in favor ofinput
. 但为了兼容旧版,有时仍然可见。推荐使用input
存储 prompts。completion
(any, optional): Deprecated in favor ofoutput
. 推荐使用output
存储模型响应。usage
(object, optional): Token 使用情况。input
(integer, optional): 输入 token 数。 (有些模型叫promptTokens
)output
(integer, optional): 输出 token 数。 (有些模型叫completionTokens
)total
(integer, optional): 总 token 数。unit
(string, optional):TOKENS
(默认) 或CHARACTERS
。
completionStartTime
(string, ISO 8601, optional): LLM 开始生成 completion 的时间,对于流式输出有用。
核心Payload结构 (/api/ingestion
):
{
"batch": [
// 一个 Trace 对象
{
"id": "generated-trace-id", // Trace ID
"name": "My .NET Process",
"userId": "user-123",
"timestamp": "2024-05-21T10:00:00.000Z" // Trace 的创建时间,也可以是第一个span的startTime
// ... 其他 Trace 字段
},
// 一个 Span 对象
{
"id": "generated-span-id-1", // Span ID
"traceId": "generated-trace-id", // 关联到上面的 Trace
"type": "SPAN",
"name": "Data Preprocessing",
"startTime": "2024-05-21T10:00:01.000Z",
"endTime": "2024-05-21T10:00:02.000Z",
"input": { "data": "raw_input" },
"output": { "data": "processed_input" }
// ... 其他 Span 字段
},
// 一个 Generation 对象 (作为上面 Span 的子节点)
{
"id": "generated-generation-id-1", // Generation ID
"traceId": "generated-trace-id", // 关联到上面的 Trace
"parentObservationId": "generated-span-id-1", // 关联到上面的 Span
"type": "GENERATION",
"name": "LLM Call",
"startTime": "2024-05-21T10:00:03.000Z",
"endTime": "2024-05-21T10:00:05.000Z",
"model": "gpt-4",
"input": [{ "role": "user", "content": "Translate to French: Hello" }],
"output": { "role": "assistant", "content": "Bonjour" },
"usage": { "input": 10, "output": 2 }
// ... 其他 Generation 字段
}
],
"metadata": { // 可选,一些批量元数据,比如 SDK 信息
"sdk_name": "my-dotnet-sdk",
"sdk_version": "1.0.0"
}
}
这里有个关键点:Trace 本身也被视为 batch
中的一个元素。虽然文档里它通常和 Observation(Span/Generation)区分开,但实际发送时,Trace Create/Update 和 Observation Create/Update 都可以作为 event 对象放到 batch
数组里。对于只创建 Trace 及其内部 Observations 的情况,Trace 更像是定义一个上下文,而 Spans/Generations 是这个上下文中的具体操作记录。上面简化结构中,traceId
是连接所有内容的纽带。
3. .NET 实现:上代码!
首先,咱们定义几个 C# 类来匹配上面的 JSON 结构。推荐使用 System.Text.Json
进行序列化。
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
// Langfuse API 配置
public class LangfuseConfig
{
public string PublicKey { get; set; }
public string SecretKey { get; set; }
public string BaseUrl { get; set; } = "https://cloud.langfuse.com"; // 或者你的自托管实例 URL
}
// ----- 数据模型 -----
public abstract class LangfuseEventBase
{
[JsonPropertyName("id")]
public string Id { get; set; } = Guid.NewGuid().ToString(); // 自动生成ID
[JsonPropertyName("traceId")]
public string? TraceId { get; set; } // Observation 类型需要
// 其他通用字段可以加在这里,如果适用
}
public class LangfuseTrace : LangfuseEventBase // Trace 本身在batch里也算一个event
{
public LangfuseTrace() {
// Trace的Id就是它的TraceId,Id从基类继承,所以这里直接赋值
base.TraceId = base.Id;
}
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("userId")]
public string? UserId { get; set; }
[JsonPropertyName("sessionId")]
public string? SessionId { get; set; }
[JsonPropertyName("release")]
public string? Release { get; set; }
[JsonPropertyName("version")]
public string? AppVersion { get; set; } // "version" 在 trace中通常指应用版本
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("tags")]
public List<string>? Tags { get; set; }
[JsonPropertyName("public")]
public bool? Public { get; set; }
// Trace有一个特殊的timestamp,表示创建时间,和Observation的startTime/endTime不同
// 但实际通过ingestion API批量提交时,这个timestamp常常省略,由第一个span的startTime推断
// 或者显式指定,它应该是此Trace的创建/开始时间
[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public abstract class LangfuseObservation : LangfuseEventBase
{
[JsonPropertyName("type")]
public abstract string Type { get; } // SPAN or GENERATION
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("startTime")]
public DateTime StartTime { get; set; } = DateTime.UtcNow;
[JsonPropertyName("endTime")]
public DateTime? EndTime { get; set; }
[JsonPropertyName("parentObservationId")]
public string? ParentObservationId { get; set; }
[JsonPropertyName("input")]
public object? Input { get; set; }
[JsonPropertyName("output")]
public object? Output { get; set; }
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("version")]
public string? Version { get; set; } // Observation 通常指SDK或Model的版本
[JsonPropertyName("level")]
public string? Level { get; set; } // DEBUG, INFO, WARNING, ERROR, DEFAULT
[JsonPropertyName("statusMessage")]
public string? StatusMessage { get; set; }
}
public class LangfuseSpan : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "SPAN";
}
public class LangfuseGeneration : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "GENERATION";
[JsonPropertyName("model")]
public string? Model { get; set; }
[JsonPropertyName("modelParameters")]
public Dictionary<string, object>? ModelParameters { get; set; }
// prompt 和 completion 优先使用基类的 Input 和 Output
// public object? Prompt { get; set; } // Use Input instead
// public object? Completion { get; set; } // Use Output instead
[JsonPropertyName("usage")]
public LangfuseUsage? Usage { get; set; }
[JsonPropertyName("completionStartTime")]
public DateTime? CompletionStartTime { get; set; }
}
public class LangfuseUsage
{
// API定义为input/output/total/unit, 但 Python SDK 里常见的是 promptTokens/completionTokens
// 为与API文档一致,我们用input/output。 如果用旧字段名,需要确保 Langfuse 能正确解析
[JsonPropertyName("input")]
public int? InputTokens { get; set; }
[JsonPropertyName("output")]
public int? OutputTokens { get; set; }
[JsonPropertyName("total")]
public int? TotalTokens { get; set; }
[JsonPropertyName("unit")]
public string Unit { get; set; } = "TOKENS"; // TOKENS or CHARACTERS
}
public class LangfuseIngestionRequest
{
[JsonPropertyName("batch")]
public List<LangfuseEventBase> Batch { get; set; } = new List<LangfuseEventBase>();
[JsonPropertyName("metadata")]
public Dictionary<string, string>? SdkMetadata { get; set; } // Optional SDK metadata
}
// Langfuse API 客户端
public class LangfuseClient
{
private readonly HttpClient _httpClient;
private readonly LangfuseConfig _config;
private readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase // 虽然我们用了JsonPropertyName,但这个也设置下
};
public LangfuseClient(LangfuseConfig config, HttpClient? httpClient = null)
{
_config = config;
_httpClient = httpClient ?? new HttpClient();
_httpClient.BaseAddress = new Uri(_config.BaseUrl);
var byteArray = Encoding.UTF8.GetBytes(using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
// Langfuse API 配置
public class LangfuseConfig
{
public string PublicKey { get; set; }
public string SecretKey { get; set; }
public string BaseUrl { get; set; } = "https://cloud.langfuse.com"; // 或者你的自托管实例 URL
}
// ----- 数据模型 -----
public abstract class LangfuseEventBase
{
[JsonPropertyName("id")]
public string Id { get; set; } = Guid.NewGuid().ToString(); // 自动生成ID
[JsonPropertyName("traceId")]
public string? TraceId { get; set; } // Observation 类型需要
// 其他通用字段可以加在这里,如果适用
}
public class LangfuseTrace : LangfuseEventBase // Trace 本身在batch里也算一个event
{
public LangfuseTrace() {
// Trace的Id就是它的TraceId,Id从基类继承,所以这里直接赋值
base.TraceId = base.Id;
}
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("userId")]
public string? UserId { get; set; }
[JsonPropertyName("sessionId")]
public string? SessionId { get; set; }
[JsonPropertyName("release")]
public string? Release { get; set; }
[JsonPropertyName("version")]
public string? AppVersion { get; set; } // "version" 在 trace中通常指应用版本
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("tags")]
public List<string>? Tags { get; set; }
[JsonPropertyName("public")]
public bool? Public { get; set; }
// Trace有一个特殊的timestamp,表示创建时间,和Observation的startTime/endTime不同
// 但实际通过ingestion API批量提交时,这个timestamp常常省略,由第一个span的startTime推断
// 或者显式指定,它应该是此Trace的创建/开始时间
[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public abstract class LangfuseObservation : LangfuseEventBase
{
[JsonPropertyName("type")]
public abstract string Type { get; } // SPAN or GENERATION
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("startTime")]
public DateTime StartTime { get; set; } = DateTime.UtcNow;
[JsonPropertyName("endTime")]
public DateTime? EndTime { get; set; }
[JsonPropertyName("parentObservationId")]
public string? ParentObservationId { get; set; }
[JsonPropertyName("input")]
public object? Input { get; set; }
[JsonPropertyName("output")]
public object? Output { get; set; }
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("version")]
public string? Version { get; set; } // Observation 通常指SDK或Model的版本
[JsonPropertyName("level")]
public string? Level { get; set; } // DEBUG, INFO, WARNING, ERROR, DEFAULT
[JsonPropertyName("statusMessage")]
public string? StatusMessage { get; set; }
}
public class LangfuseSpan : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "SPAN";
}
public class LangfuseGeneration : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "GENERATION";
[JsonPropertyName("model")]
public string? Model { get; set; }
[JsonPropertyName("modelParameters")]
public Dictionary<string, object>? ModelParameters { get; set; }
// prompt 和 completion 优先使用基类的 Input 和 Output
// public object? Prompt { get; set; } // Use Input instead
// public object? Completion { get; set; } // Use Output instead
[JsonPropertyName("usage")]
public LangfuseUsage? Usage { get; set; }
[JsonPropertyName("completionStartTime")]
public DateTime? CompletionStartTime { get; set; }
}
public class LangfuseUsage
{
// API定义为input/output/total/unit, 但 Python SDK 里常见的是 promptTokens/completionTokens
// 为与API文档一致,我们用input/output。 如果用旧字段名,需要确保 Langfuse 能正确解析
[JsonPropertyName("input")]
public int? InputTokens { get; set; }
[JsonPropertyName("output")]
public int? OutputTokens { get; set; }
[JsonPropertyName("total")]
public int? TotalTokens { get; set; }
[JsonPropertyName("unit")]
public string Unit { get; set; } = "TOKENS"; // TOKENS or CHARACTERS
}
public class LangfuseIngestionRequest
{
[JsonPropertyName("batch")]
public List<LangfuseEventBase> Batch { get; set; } = new List<LangfuseEventBase>();
[JsonPropertyName("metadata")]
public Dictionary<string, string>? SdkMetadata { get; set; } // Optional SDK metadata
}
// Langfuse API 客户端
public class LangfuseClient
{
private readonly HttpClient _httpClient;
private readonly LangfuseConfig _config;
private readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase // 虽然我们用了JsonPropertyName,但这个也设置下
};
public LangfuseClient(LangfuseConfig config, HttpClient? httpClient = null)
{
_config = config;
_httpClient = httpClient ?? new HttpClient();
_httpClient.BaseAddress = new Uri(_config.BaseUrl);
var byteArray = Encoding.UTF8.GetBytes($"{_config.PublicKey}:{_config.SecretKey}");
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
}
public async Task IngestAsync(LangfuseIngestionRequest request)
{
// 添加默认的SDK元数据
request.SdkMetadata ??= new Dictionary<string, string>
{
{ "sdk_name", "my-custom-dotnet-langfuse" },
{ "sdk_version", "0.1.0" }
};
var jsonPayload = JsonSerializer.Serialize(request, _jsonSerializerOptions);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
// Console.WriteLine($"Sending to Langfuse: {jsonPayload}"); // 调试用
var response = await _httpClient.PostAsync("/api/ingestion", content);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
// 你可以在这里抛出更具体的异常,或者记录日志
throw new HttpRequestException($"Langfuse API request failed with status code {response.StatusCode}: {errorContent}");
}
// 成功后,可以记录一下,或者什么都不做
// Console.WriteLine("Successfully ingested events to Langfuse.");
}
}
quot;{_config.PublicKey}:{_config.SecretKey}");
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
}
public async Task IngestAsync(LangfuseIngestionRequest request)
{
// 添加默认的SDK元数据
request.SdkMetadata ??= new Dictionary<string, string>
{
{ "sdk_name", "my-custom-dotnet-langfuse" },
{ "sdk_version", "0.1.0" }
};
var jsonPayload = JsonSerializer.Serialize(request, _jsonSerializerOptions);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
// Console.WriteLine(using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
// Langfuse API 配置
public class LangfuseConfig
{
public string PublicKey { get; set; }
public string SecretKey { get; set; }
public string BaseUrl { get; set; } = "https://cloud.langfuse.com"; // 或者你的自托管实例 URL
}
// ----- 数据模型 -----
public abstract class LangfuseEventBase
{
[JsonPropertyName("id")]
public string Id { get; set; } = Guid.NewGuid().ToString(); // 自动生成ID
[JsonPropertyName("traceId")]
public string? TraceId { get; set; } // Observation 类型需要
// 其他通用字段可以加在这里,如果适用
}
public class LangfuseTrace : LangfuseEventBase // Trace 本身在batch里也算一个event
{
public LangfuseTrace() {
// Trace的Id就是它的TraceId,Id从基类继承,所以这里直接赋值
base.TraceId = base.Id;
}
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("userId")]
public string? UserId { get; set; }
[JsonPropertyName("sessionId")]
public string? SessionId { get; set; }
[JsonPropertyName("release")]
public string? Release { get; set; }
[JsonPropertyName("version")]
public string? AppVersion { get; set; } // "version" 在 trace中通常指应用版本
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("tags")]
public List<string>? Tags { get; set; }
[JsonPropertyName("public")]
public bool? Public { get; set; }
// Trace有一个特殊的timestamp,表示创建时间,和Observation的startTime/endTime不同
// 但实际通过ingestion API批量提交时,这个timestamp常常省略,由第一个span的startTime推断
// 或者显式指定,它应该是此Trace的创建/开始时间
[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public abstract class LangfuseObservation : LangfuseEventBase
{
[JsonPropertyName("type")]
public abstract string Type { get; } // SPAN or GENERATION
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("startTime")]
public DateTime StartTime { get; set; } = DateTime.UtcNow;
[JsonPropertyName("endTime")]
public DateTime? EndTime { get; set; }
[JsonPropertyName("parentObservationId")]
public string? ParentObservationId { get; set; }
[JsonPropertyName("input")]
public object? Input { get; set; }
[JsonPropertyName("output")]
public object? Output { get; set; }
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("version")]
public string? Version { get; set; } // Observation 通常指SDK或Model的版本
[JsonPropertyName("level")]
public string? Level { get; set; } // DEBUG, INFO, WARNING, ERROR, DEFAULT
[JsonPropertyName("statusMessage")]
public string? StatusMessage { get; set; }
}
public class LangfuseSpan : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "SPAN";
}
public class LangfuseGeneration : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "GENERATION";
[JsonPropertyName("model")]
public string? Model { get; set; }
[JsonPropertyName("modelParameters")]
public Dictionary<string, object>? ModelParameters { get; set; }
// prompt 和 completion 优先使用基类的 Input 和 Output
// public object? Prompt { get; set; } // Use Input instead
// public object? Completion { get; set; } // Use Output instead
[JsonPropertyName("usage")]
public LangfuseUsage? Usage { get; set; }
[JsonPropertyName("completionStartTime")]
public DateTime? CompletionStartTime { get; set; }
}
public class LangfuseUsage
{
// API定义为input/output/total/unit, 但 Python SDK 里常见的是 promptTokens/completionTokens
// 为与API文档一致,我们用input/output。 如果用旧字段名,需要确保 Langfuse 能正确解析
[JsonPropertyName("input")]
public int? InputTokens { get; set; }
[JsonPropertyName("output")]
public int? OutputTokens { get; set; }
[JsonPropertyName("total")]
public int? TotalTokens { get; set; }
[JsonPropertyName("unit")]
public string Unit { get; set; } = "TOKENS"; // TOKENS or CHARACTERS
}
public class LangfuseIngestionRequest
{
[JsonPropertyName("batch")]
public List<LangfuseEventBase> Batch { get; set; } = new List<LangfuseEventBase>();
[JsonPropertyName("metadata")]
public Dictionary<string, string>? SdkMetadata { get; set; } // Optional SDK metadata
}
// Langfuse API 客户端
public class LangfuseClient
{
private readonly HttpClient _httpClient;
private readonly LangfuseConfig _config;
private readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase // 虽然我们用了JsonPropertyName,但这个也设置下
};
public LangfuseClient(LangfuseConfig config, HttpClient? httpClient = null)
{
_config = config;
_httpClient = httpClient ?? new HttpClient();
_httpClient.BaseAddress = new Uri(_config.BaseUrl);
var byteArray = Encoding.UTF8.GetBytes($"{_config.PublicKey}:{_config.SecretKey}");
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
}
public async Task IngestAsync(LangfuseIngestionRequest request)
{
// 添加默认的SDK元数据
request.SdkMetadata ??= new Dictionary<string, string>
{
{ "sdk_name", "my-custom-dotnet-langfuse" },
{ "sdk_version", "0.1.0" }
};
var jsonPayload = JsonSerializer.Serialize(request, _jsonSerializerOptions);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
// Console.WriteLine($"Sending to Langfuse: {jsonPayload}"); // 调试用
var response = await _httpClient.PostAsync("/api/ingestion", content);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
// 你可以在这里抛出更具体的异常,或者记录日志
throw new HttpRequestException($"Langfuse API request failed with status code {response.StatusCode}: {errorContent}");
}
// 成功后,可以记录一下,或者什么都不做
// Console.WriteLine("Successfully ingested events to Langfuse.");
}
}
quot;Sending to Langfuse: {jsonPayload}"); // 调试用
var response = await _httpClient.PostAsync("/api/ingestion", content);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
// 你可以在这里抛出更具体的异常,或者记录日志
throw new HttpRequestException(using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
// Langfuse API 配置
public class LangfuseConfig
{
public string PublicKey { get; set; }
public string SecretKey { get; set; }
public string BaseUrl { get; set; } = "https://cloud.langfuse.com"; // 或者你的自托管实例 URL
}
// ----- 数据模型 -----
public abstract class LangfuseEventBase
{
[JsonPropertyName("id")]
public string Id { get; set; } = Guid.NewGuid().ToString(); // 自动生成ID
[JsonPropertyName("traceId")]
public string? TraceId { get; set; } // Observation 类型需要
// 其他通用字段可以加在这里,如果适用
}
public class LangfuseTrace : LangfuseEventBase // Trace 本身在batch里也算一个event
{
public LangfuseTrace() {
// Trace的Id就是它的TraceId,Id从基类继承,所以这里直接赋值
base.TraceId = base.Id;
}
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("userId")]
public string? UserId { get; set; }
[JsonPropertyName("sessionId")]
public string? SessionId { get; set; }
[JsonPropertyName("release")]
public string? Release { get; set; }
[JsonPropertyName("version")]
public string? AppVersion { get; set; } // "version" 在 trace中通常指应用版本
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("tags")]
public List<string>? Tags { get; set; }
[JsonPropertyName("public")]
public bool? Public { get; set; }
// Trace有一个特殊的timestamp,表示创建时间,和Observation的startTime/endTime不同
// 但实际通过ingestion API批量提交时,这个timestamp常常省略,由第一个span的startTime推断
// 或者显式指定,它应该是此Trace的创建/开始时间
[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; } = DateTime.UtcNow;
}
public abstract class LangfuseObservation : LangfuseEventBase
{
[JsonPropertyName("type")]
public abstract string Type { get; } // SPAN or GENERATION
[JsonPropertyName("name")]
public string? Name { get; set; }
[JsonPropertyName("startTime")]
public DateTime StartTime { get; set; } = DateTime.UtcNow;
[JsonPropertyName("endTime")]
public DateTime? EndTime { get; set; }
[JsonPropertyName("parentObservationId")]
public string? ParentObservationId { get; set; }
[JsonPropertyName("input")]
public object? Input { get; set; }
[JsonPropertyName("output")]
public object? Output { get; set; }
[JsonPropertyName("metadata")]
public object? Metadata { get; set; }
[JsonPropertyName("version")]
public string? Version { get; set; } // Observation 通常指SDK或Model的版本
[JsonPropertyName("level")]
public string? Level { get; set; } // DEBUG, INFO, WARNING, ERROR, DEFAULT
[JsonPropertyName("statusMessage")]
public string? StatusMessage { get; set; }
}
public class LangfuseSpan : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "SPAN";
}
public class LangfuseGeneration : LangfuseObservation
{
[JsonPropertyName("type")]
public override string Type => "GENERATION";
[JsonPropertyName("model")]
public string? Model { get; set; }
[JsonPropertyName("modelParameters")]
public Dictionary<string, object>? ModelParameters { get; set; }
// prompt 和 completion 优先使用基类的 Input 和 Output
// public object? Prompt { get; set; } // Use Input instead
// public object? Completion { get; set; } // Use Output instead
[JsonPropertyName("usage")]
public LangfuseUsage? Usage { get; set; }
[JsonPropertyName("completionStartTime")]
public DateTime? CompletionStartTime { get; set; }
}
public class LangfuseUsage
{
// API定义为input/output/total/unit, 但 Python SDK 里常见的是 promptTokens/completionTokens
// 为与API文档一致,我们用input/output。 如果用旧字段名,需要确保 Langfuse 能正确解析
[JsonPropertyName("input")]
public int? InputTokens { get; set; }
[JsonPropertyName("output")]
public int? OutputTokens { get; set; }
[JsonPropertyName("total")]
public int? TotalTokens { get; set; }
[JsonPropertyName("unit")]
public string Unit { get; set; } = "TOKENS"; // TOKENS or CHARACTERS
}
public class LangfuseIngestionRequest
{
[JsonPropertyName("batch")]
public List<LangfuseEventBase> Batch { get; set; } = new List<LangfuseEventBase>();
[JsonPropertyName("metadata")]
public Dictionary<string, string>? SdkMetadata { get; set; } // Optional SDK metadata
}
// Langfuse API 客户端
public class LangfuseClient
{
private readonly HttpClient _httpClient;
private readonly LangfuseConfig _config;
private readonly JsonSerializerOptions _jsonSerializerOptions = new JsonSerializerOptions
{
DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull,
PropertyNamingPolicy = JsonNamingPolicy.CamelCase // 虽然我们用了JsonPropertyName,但这个也设置下
};
public LangfuseClient(LangfuseConfig config, HttpClient? httpClient = null)
{
_config = config;
_httpClient = httpClient ?? new HttpClient();
_httpClient.BaseAddress = new Uri(_config.BaseUrl);
var byteArray = Encoding.UTF8.GetBytes($"{_config.PublicKey}:{_config.SecretKey}");
_httpClient.DefaultRequestHeaders.Authorization =
new AuthenticationHeaderValue("Basic", Convert.ToBase64String(byteArray));
}
public async Task IngestAsync(LangfuseIngestionRequest request)
{
// 添加默认的SDK元数据
request.SdkMetadata ??= new Dictionary<string, string>
{
{ "sdk_name", "my-custom-dotnet-langfuse" },
{ "sdk_version", "0.1.0" }
};
var jsonPayload = JsonSerializer.Serialize(request, _jsonSerializerOptions);
var content = new StringContent(jsonPayload, Encoding.UTF8, "application/json");
// Console.WriteLine($"Sending to Langfuse: {jsonPayload}"); // 调试用
var response = await _httpClient.PostAsync("/api/ingestion", content);
if (!response.IsSuccessStatusCode)
{
var errorContent = await response.Content.ReadAsStringAsync();
// 你可以在这里抛出更具体的异常,或者记录日志
throw new HttpRequestException($"Langfuse API request failed with status code {response.StatusCode}: {errorContent}");
}
// 成功后,可以记录一下,或者什么都不做
// Console.WriteLine("Successfully ingested events to Langfuse.");
}
}
quot;Langfuse API request failed with status code {response.StatusCode}: {errorContent}");
}
// 成功后,可以记录一下,或者什么都不做
// Console.WriteLine("Successfully ingested events to Langfuse.");
}
}
使用示例:
public class MyAIService
{
private readonly LangfuseClient _langfuseClient;
public MyAIService(LangfuseClient langfuseClient)
{
_langfuseClient = langfuseClient;
}
public async Task ProcessRequestAsync(string userInput)
{
// 1. 创建一个 Trace 对象
var trace = new LangfuseTrace
{
Name = "User Query Processing (.NET)",
UserId = "test-user-dotnet",
Tags = new List<string> { "dotnet_example" },
Metadata = new { requestSource = "api_v2" }
};
// 准备批量提交的事件列表
var eventsToIngest = new List<LangfuseEventBase> { trace };
// 2. 创建一个顶层 Span (数据预处理)
var preprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id, // 关联到上面的 Trace
Name = "Data Preprocessing",
StartTime = DateTime.UtcNow,
Input = new { rawInput = userInput }
};
eventsToIngest.Add(preprocessingSpan);
// 模拟一些处理
await Task.Delay(100); // 模拟耗时
string processedInput = public class MyAIService
{
private readonly LangfuseClient _langfuseClient;
public MyAIService(LangfuseClient langfuseClient)
{
_langfuseClient = langfuseClient;
}
public async Task ProcessRequestAsync(string userInput)
{
// 1. 创建一个 Trace 对象
var trace = new LangfuseTrace
{
Name = "User Query Processing (.NET)",
UserId = "test-user-dotnet",
Tags = new List<string> { "dotnet_example" },
Metadata = new { requestSource = "api_v2" }
};
// 准备批量提交的事件列表
var eventsToIngest = new List<LangfuseEventBase> { trace };
// 2. 创建一个顶层 Span (数据预处理)
var preprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id, // 关联到上面的 Trace
Name = "Data Preprocessing",
StartTime = DateTime.UtcNow,
Input = new { rawInput = userInput }
};
eventsToIngest.Add(preprocessingSpan);
// 模拟一些处理
await Task.Delay(100); // 模拟耗时
string processedInput = $"Processed: {userInput.ToUpper()}";
preprocessingSpan.Output = new { processedData = processedInput };
preprocessingSpan.EndTime = DateTime.UtcNow;
// 3. 创建一个 Generation (LLM 调用),作为上一个 Span 的子节点
var llmGeneration = new LangfuseGeneration
{
TraceId = trace.Id, // 关联到 Trace
ParentObservationId = preprocessingSpan.Id, // 父节点是预处理 Span
Name = "OpenAI Call - gpt-3.5-turbo",
StartTime = DateTime.UtcNow,
Model = "gpt-3.5-turbo",
Input = new [] // OpenAI messages format
{
new { role = "system", content = "You are a helpful assistant." },
new { role = "user", content = processedInput }
},
ModelParameters = new Dictionary<string, object> { { "temperature", 0.7 } }
};
eventsToIngest.Add(llmGeneration);
// 模拟 LLM 调用和获取结果
await Task.Delay(300); // 模拟 LLM 耗时
string llmResponse = $"LLM Response to: {processedInput}";
llmGeneration.Output = new { role = "assistant", content = llmResponse };
llmGeneration.Usage = new LangfuseUsage
{
InputTokens = 50, // 假设值
OutputTokens = 30 // 假设值
};
llmGeneration.EndTime = DateTime.UtcNow;
// 4. 创建另一个 Span (后处理),也是 Trace 的直接子节点
var postprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id,
Name = "Response Postprocessing",
StartTime = DateTime.UtcNow,
Input = new { llmOutput = llmResponse }
};
eventsToIngest.Add(postprocessingSpan);
await Task.Delay(50);
string finalResponse = $"Final: {llmResponse}!!!";
postprocessingSpan.Output = new { finalData = finalResponse };
postprocessingSpan.EndTime = DateTime.UtcNow;
// 准备 ingestion 请求
var ingestionRequest = new LangfuseIngestionRequest
{
Batch = eventsToIngest
};
// 发送!
try
{
await _langfuseClient.IngestAsync(ingestionRequest);
Console.WriteLine("Trace successfully sent to Langfuse!");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending trace to Langfuse: {ex.Message}");
// 在这里可以添加更完善的错误处理逻辑,比如重试或记录到备用日志系统
}
}
}
// ----- 主程序入口示例 (例如 Program.cs) -----
// public class Program
// {
// public static async Task Main(string[] args)
// {
// var config = new LangfuseConfig
// {
// PublicKey = Environment.GetEnvironmentVariable("LANGFUSE_PUBLIC_KEY") ?? "your_public_key",
// SecretKey = Environment.GetEnvironmentVariable("LANGFUSE_SECRET_KEY") ?? "your_secret_key",
// // BaseUrl = "https://your-self-hosted-langfuse.com" // 如果自托管
// };
// if (config.PublicKey == "your_public_key" || config.SecretKey == "your_secret_key")
// {
// Console.WriteLine("Please set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables.");
// return;
// }
// var langfuseClient = new LangfuseClient(config);
// var myService = new MyAIService(langfuseClient);
// await myService.ProcessRequestAsync("Hello Langfuse from .NET!");
// }
// }
quot;Processed: {userInput.ToUpper()}";
preprocessingSpan.Output = new { processedData = processedInput };
preprocessingSpan.EndTime = DateTime.UtcNow;
// 3. 创建一个 Generation (LLM 调用),作为上一个 Span 的子节点
var llmGeneration = new LangfuseGeneration
{
TraceId = trace.Id, // 关联到 Trace
ParentObservationId = preprocessingSpan.Id, // 父节点是预处理 Span
Name = "OpenAI Call - gpt-3.5-turbo",
StartTime = DateTime.UtcNow,
Model = "gpt-3.5-turbo",
Input = new [] // OpenAI messages format
{
new { role = "system", content = "You are a helpful assistant." },
new { role = "user", content = processedInput }
},
ModelParameters = new Dictionary<string, object> { { "temperature", 0.7 } }
};
eventsToIngest.Add(llmGeneration);
// 模拟 LLM 调用和获取结果
await Task.Delay(300); // 模拟 LLM 耗时
string llmResponse = public class MyAIService
{
private readonly LangfuseClient _langfuseClient;
public MyAIService(LangfuseClient langfuseClient)
{
_langfuseClient = langfuseClient;
}
public async Task ProcessRequestAsync(string userInput)
{
// 1. 创建一个 Trace 对象
var trace = new LangfuseTrace
{
Name = "User Query Processing (.NET)",
UserId = "test-user-dotnet",
Tags = new List<string> { "dotnet_example" },
Metadata = new { requestSource = "api_v2" }
};
// 准备批量提交的事件列表
var eventsToIngest = new List<LangfuseEventBase> { trace };
// 2. 创建一个顶层 Span (数据预处理)
var preprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id, // 关联到上面的 Trace
Name = "Data Preprocessing",
StartTime = DateTime.UtcNow,
Input = new { rawInput = userInput }
};
eventsToIngest.Add(preprocessingSpan);
// 模拟一些处理
await Task.Delay(100); // 模拟耗时
string processedInput = $"Processed: {userInput.ToUpper()}";
preprocessingSpan.Output = new { processedData = processedInput };
preprocessingSpan.EndTime = DateTime.UtcNow;
// 3. 创建一个 Generation (LLM 调用),作为上一个 Span 的子节点
var llmGeneration = new LangfuseGeneration
{
TraceId = trace.Id, // 关联到 Trace
ParentObservationId = preprocessingSpan.Id, // 父节点是预处理 Span
Name = "OpenAI Call - gpt-3.5-turbo",
StartTime = DateTime.UtcNow,
Model = "gpt-3.5-turbo",
Input = new [] // OpenAI messages format
{
new { role = "system", content = "You are a helpful assistant." },
new { role = "user", content = processedInput }
},
ModelParameters = new Dictionary<string, object> { { "temperature", 0.7 } }
};
eventsToIngest.Add(llmGeneration);
// 模拟 LLM 调用和获取结果
await Task.Delay(300); // 模拟 LLM 耗时
string llmResponse = $"LLM Response to: {processedInput}";
llmGeneration.Output = new { role = "assistant", content = llmResponse };
llmGeneration.Usage = new LangfuseUsage
{
InputTokens = 50, // 假设值
OutputTokens = 30 // 假设值
};
llmGeneration.EndTime = DateTime.UtcNow;
// 4. 创建另一个 Span (后处理),也是 Trace 的直接子节点
var postprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id,
Name = "Response Postprocessing",
StartTime = DateTime.UtcNow,
Input = new { llmOutput = llmResponse }
};
eventsToIngest.Add(postprocessingSpan);
await Task.Delay(50);
string finalResponse = $"Final: {llmResponse}!!!";
postprocessingSpan.Output = new { finalData = finalResponse };
postprocessingSpan.EndTime = DateTime.UtcNow;
// 准备 ingestion 请求
var ingestionRequest = new LangfuseIngestionRequest
{
Batch = eventsToIngest
};
// 发送!
try
{
await _langfuseClient.IngestAsync(ingestionRequest);
Console.WriteLine("Trace successfully sent to Langfuse!");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending trace to Langfuse: {ex.Message}");
// 在这里可以添加更完善的错误处理逻辑,比如重试或记录到备用日志系统
}
}
}
// ----- 主程序入口示例 (例如 Program.cs) -----
// public class Program
// {
// public static async Task Main(string[] args)
// {
// var config = new LangfuseConfig
// {
// PublicKey = Environment.GetEnvironmentVariable("LANGFUSE_PUBLIC_KEY") ?? "your_public_key",
// SecretKey = Environment.GetEnvironmentVariable("LANGFUSE_SECRET_KEY") ?? "your_secret_key",
// // BaseUrl = "https://your-self-hosted-langfuse.com" // 如果自托管
// };
// if (config.PublicKey == "your_public_key" || config.SecretKey == "your_secret_key")
// {
// Console.WriteLine("Please set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables.");
// return;
// }
// var langfuseClient = new LangfuseClient(config);
// var myService = new MyAIService(langfuseClient);
// await myService.ProcessRequestAsync("Hello Langfuse from .NET!");
// }
// }
quot;LLM Response to: {processedInput}";
llmGeneration.Output = new { role = "assistant", content = llmResponse };
llmGeneration.Usage = new LangfuseUsage
{
InputTokens = 50, // 假设值
OutputTokens = 30 // 假设值
};
llmGeneration.EndTime = DateTime.UtcNow;
// 4. 创建另一个 Span (后处理),也是 Trace 的直接子节点
var postprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id,
Name = "Response Postprocessing",
StartTime = DateTime.UtcNow,
Input = new { llmOutput = llmResponse }
};
eventsToIngest.Add(postprocessingSpan);
await Task.Delay(50);
string finalResponse = public class MyAIService
{
private readonly LangfuseClient _langfuseClient;
public MyAIService(LangfuseClient langfuseClient)
{
_langfuseClient = langfuseClient;
}
public async Task ProcessRequestAsync(string userInput)
{
// 1. 创建一个 Trace 对象
var trace = new LangfuseTrace
{
Name = "User Query Processing (.NET)",
UserId = "test-user-dotnet",
Tags = new List<string> { "dotnet_example" },
Metadata = new { requestSource = "api_v2" }
};
// 准备批量提交的事件列表
var eventsToIngest = new List<LangfuseEventBase> { trace };
// 2. 创建一个顶层 Span (数据预处理)
var preprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id, // 关联到上面的 Trace
Name = "Data Preprocessing",
StartTime = DateTime.UtcNow,
Input = new { rawInput = userInput }
};
eventsToIngest.Add(preprocessingSpan);
// 模拟一些处理
await Task.Delay(100); // 模拟耗时
string processedInput = $"Processed: {userInput.ToUpper()}";
preprocessingSpan.Output = new { processedData = processedInput };
preprocessingSpan.EndTime = DateTime.UtcNow;
// 3. 创建一个 Generation (LLM 调用),作为上一个 Span 的子节点
var llmGeneration = new LangfuseGeneration
{
TraceId = trace.Id, // 关联到 Trace
ParentObservationId = preprocessingSpan.Id, // 父节点是预处理 Span
Name = "OpenAI Call - gpt-3.5-turbo",
StartTime = DateTime.UtcNow,
Model = "gpt-3.5-turbo",
Input = new [] // OpenAI messages format
{
new { role = "system", content = "You are a helpful assistant." },
new { role = "user", content = processedInput }
},
ModelParameters = new Dictionary<string, object> { { "temperature", 0.7 } }
};
eventsToIngest.Add(llmGeneration);
// 模拟 LLM 调用和获取结果
await Task.Delay(300); // 模拟 LLM 耗时
string llmResponse = $"LLM Response to: {processedInput}";
llmGeneration.Output = new { role = "assistant", content = llmResponse };
llmGeneration.Usage = new LangfuseUsage
{
InputTokens = 50, // 假设值
OutputTokens = 30 // 假设值
};
llmGeneration.EndTime = DateTime.UtcNow;
// 4. 创建另一个 Span (后处理),也是 Trace 的直接子节点
var postprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id,
Name = "Response Postprocessing",
StartTime = DateTime.UtcNow,
Input = new { llmOutput = llmResponse }
};
eventsToIngest.Add(postprocessingSpan);
await Task.Delay(50);
string finalResponse = $"Final: {llmResponse}!!!";
postprocessingSpan.Output = new { finalData = finalResponse };
postprocessingSpan.EndTime = DateTime.UtcNow;
// 准备 ingestion 请求
var ingestionRequest = new LangfuseIngestionRequest
{
Batch = eventsToIngest
};
// 发送!
try
{
await _langfuseClient.IngestAsync(ingestionRequest);
Console.WriteLine("Trace successfully sent to Langfuse!");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending trace to Langfuse: {ex.Message}");
// 在这里可以添加更完善的错误处理逻辑,比如重试或记录到备用日志系统
}
}
}
// ----- 主程序入口示例 (例如 Program.cs) -----
// public class Program
// {
// public static async Task Main(string[] args)
// {
// var config = new LangfuseConfig
// {
// PublicKey = Environment.GetEnvironmentVariable("LANGFUSE_PUBLIC_KEY") ?? "your_public_key",
// SecretKey = Environment.GetEnvironmentVariable("LANGFUSE_SECRET_KEY") ?? "your_secret_key",
// // BaseUrl = "https://your-self-hosted-langfuse.com" // 如果自托管
// };
// if (config.PublicKey == "your_public_key" || config.SecretKey == "your_secret_key")
// {
// Console.WriteLine("Please set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables.");
// return;
// }
// var langfuseClient = new LangfuseClient(config);
// var myService = new MyAIService(langfuseClient);
// await myService.ProcessRequestAsync("Hello Langfuse from .NET!");
// }
// }
quot;Final: {llmResponse}!!!";
postprocessingSpan.Output = new { finalData = finalResponse };
postprocessingSpan.EndTime = DateTime.UtcNow;
// 准备 ingestion 请求
var ingestionRequest = new LangfuseIngestionRequest
{
Batch = eventsToIngest
};
// 发送!
try
{
await _langfuseClient.IngestAsync(ingestionRequest);
Console.WriteLine("Trace successfully sent to Langfuse!");
}
catch (Exception ex)
{
Console.WriteLine(public class MyAIService
{
private readonly LangfuseClient _langfuseClient;
public MyAIService(LangfuseClient langfuseClient)
{
_langfuseClient = langfuseClient;
}
public async Task ProcessRequestAsync(string userInput)
{
// 1. 创建一个 Trace 对象
var trace = new LangfuseTrace
{
Name = "User Query Processing (.NET)",
UserId = "test-user-dotnet",
Tags = new List<string> { "dotnet_example" },
Metadata = new { requestSource = "api_v2" }
};
// 准备批量提交的事件列表
var eventsToIngest = new List<LangfuseEventBase> { trace };
// 2. 创建一个顶层 Span (数据预处理)
var preprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id, // 关联到上面的 Trace
Name = "Data Preprocessing",
StartTime = DateTime.UtcNow,
Input = new { rawInput = userInput }
};
eventsToIngest.Add(preprocessingSpan);
// 模拟一些处理
await Task.Delay(100); // 模拟耗时
string processedInput = $"Processed: {userInput.ToUpper()}";
preprocessingSpan.Output = new { processedData = processedInput };
preprocessingSpan.EndTime = DateTime.UtcNow;
// 3. 创建一个 Generation (LLM 调用),作为上一个 Span 的子节点
var llmGeneration = new LangfuseGeneration
{
TraceId = trace.Id, // 关联到 Trace
ParentObservationId = preprocessingSpan.Id, // 父节点是预处理 Span
Name = "OpenAI Call - gpt-3.5-turbo",
StartTime = DateTime.UtcNow,
Model = "gpt-3.5-turbo",
Input = new [] // OpenAI messages format
{
new { role = "system", content = "You are a helpful assistant." },
new { role = "user", content = processedInput }
},
ModelParameters = new Dictionary<string, object> { { "temperature", 0.7 } }
};
eventsToIngest.Add(llmGeneration);
// 模拟 LLM 调用和获取结果
await Task.Delay(300); // 模拟 LLM 耗时
string llmResponse = $"LLM Response to: {processedInput}";
llmGeneration.Output = new { role = "assistant", content = llmResponse };
llmGeneration.Usage = new LangfuseUsage
{
InputTokens = 50, // 假设值
OutputTokens = 30 // 假设值
};
llmGeneration.EndTime = DateTime.UtcNow;
// 4. 创建另一个 Span (后处理),也是 Trace 的直接子节点
var postprocessingSpan = new LangfuseSpan
{
TraceId = trace.Id,
Name = "Response Postprocessing",
StartTime = DateTime.UtcNow,
Input = new { llmOutput = llmResponse }
};
eventsToIngest.Add(postprocessingSpan);
await Task.Delay(50);
string finalResponse = $"Final: {llmResponse}!!!";
postprocessingSpan.Output = new { finalData = finalResponse };
postprocessingSpan.EndTime = DateTime.UtcNow;
// 准备 ingestion 请求
var ingestionRequest = new LangfuseIngestionRequest
{
Batch = eventsToIngest
};
// 发送!
try
{
await _langfuseClient.IngestAsync(ingestionRequest);
Console.WriteLine("Trace successfully sent to Langfuse!");
}
catch (Exception ex)
{
Console.WriteLine($"Error sending trace to Langfuse: {ex.Message}");
// 在这里可以添加更完善的错误处理逻辑,比如重试或记录到备用日志系统
}
}
}
// ----- 主程序入口示例 (例如 Program.cs) -----
// public class Program
// {
// public static async Task Main(string[] args)
// {
// var config = new LangfuseConfig
// {
// PublicKey = Environment.GetEnvironmentVariable("LANGFUSE_PUBLIC_KEY") ?? "your_public_key",
// SecretKey = Environment.GetEnvironmentVariable("LANGFUSE_SECRET_KEY") ?? "your_secret_key",
// // BaseUrl = "https://your-self-hosted-langfuse.com" // 如果自托管
// };
// if (config.PublicKey == "your_public_key" || config.SecretKey == "your_secret_key")
// {
// Console.WriteLine("Please set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables.");
// return;
// }
// var langfuseClient = new LangfuseClient(config);
// var myService = new MyAIService(langfuseClient);
// await myService.ProcessRequestAsync("Hello Langfuse from .NET!");
// }
// }
quot;Error sending trace to Langfuse: {ex.Message}");
// 在这里可以添加更完善的错误处理逻辑,比如重试或记录到备用日志系统
}
}
}
// ----- 主程序入口示例 (例如 Program.cs) -----
// public class Program
// {
// public static async Task Main(string[] args)
// {
// var config = new LangfuseConfig
// {
// PublicKey = Environment.GetEnvironmentVariable("LANGFUSE_PUBLIC_KEY") ?? "your_public_key",
// SecretKey = Environment.GetEnvironmentVariable("LANGFUSE_SECRET_KEY") ?? "your_secret_key",
// // BaseUrl = "https://your-self-hosted-langfuse.com" // 如果自托管
// };
// if (config.PublicKey == "your_public_key" || config.SecretKey == "your_secret_key")
// {
// Console.WriteLine("Please set LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY environment variables.");
// return;
// }
// var langfuseClient = new LangfuseClient(config);
// var myService = new MyAIService(langfuseClient);
// await myService.ProcessRequestAsync("Hello Langfuse from .NET!");
// }
// }
记得替换 LANGFUSE_PUBLIC_KEY
和 LANGFUSE_SECRET_KEY
为你自己的密钥。上面的示例演示了如何创建 Trace,以及如何将 Span 和 Generation 与 Trace 关联起来,Generation 还可以作为 Span 的子节点。Id
、TraceId
和 ParentObservationId
是构建这种层级关系的关键。所有时间戳都应该是 UTC 时间,并符合 ISO 8601 格式 ( yyyy-MM-ddTHH:mm:ss.fffZ
),DateTime.UtcNow
在 C# 中配合 System.Text.Json
默认就能序列化成合适的格式。
4. 安全小贴士
- 密钥管理: 千万别把 Public Key 和 Secret Key 硬编码到代码里。推荐使用环境变量、Azure Key Vault、AWS Secrets Manager 或 .NET 的
secrets.json
(开发环境) 和用户机密 (user secrets) 或其他配置管理工具来安全地存储和加载它们。 - HTTPS: 确保你的 Langfuse 实例 (无论是云版本还是自托管) 使用 HTTPS,保障数据传输安全。
HttpClient
默认就会验证服务器证书。
5. 进阶使用技巧
- 异步和并发: Langfuse 调用是网络 I/O 操作,使用
async/await
可以避免阻塞线程,提升应用吞吐量。如果需要批量发送非常多的独立 Trace,可以考虑并发发送,但要注意 Langfuse 服务端的速率限制。 - 错误处理和重试: 网络请求总有失败的可能。对
IngestAsync
调用进行try-catch
,并根据错误类型(比如HttpRequestException
)实现重试逻辑(例如使用 Polly 库)可以增加系统的健壮性。 - 批量优化: 上面的
LangfuseIngestionRequest
中的Batch
列表本身就是为了批量提交设计的。如果你在一个请求处理流程中产生了多个独立的观测(比如一次 Trace 及其所有 Spans/Generations),把它们都收集到Batch
数组里,一次POST
请求发过去,比每个 Observation 单独发一次效率高得多。 - 元数据 (Metadata) 和标签 (Tags): 充分利用
metadata
和tags
字段。metadata
可以存储任意结构化数据,非常适合添加上下文信息、调试参数等。tags
则方便你在 Langfuse UI 中进行筛选和分类。 Idempotency-Key
Header (可选): 如果你的场景中可能因为网络问题等导致重复发送相同的事件批次,并且希望避免重复记录,可以考虑在请求头中加入Idempotency-Key
。其值为一个唯一的字符串(比如 UUID),Langfuse 服务端在一段时间内看到相同的Idempotency-Key
,会认为这是同一次请求,只处理第一个。不过,我们示例中每个Event都有唯一ID,Langfuse服务端通常也能处理好幂等性。
搞定!通过这种方式,你就能纯粹用 .NET 代码跟 Langfuse 愉快地玩耍了,Python 小老弟可以光荣退休(至少在这部分任务上)。这样不仅项目结构更统一,部署也省心不少。