主页 > IT业界  > 

C#将Box企业网盘里的文件批量上载到S3,并导入Redshift

C#将Box企业网盘里的文件批量上载到S3,并导入Redshift

用C#.NET 8将Box企业网盘里一个目录下的所有文件全部上载到S3的一个目录下,这些文件假设全是gzip压缩文件,然后全部导入Amazon Redshift数据库,要实现异步处理,异常处理和输出运行状态日志,所有参数信息来自ini配置文件。

将Box企业网盘里的文件上传到Amazon S3,你需要分别使用Box API和Amazon S3 API。在C#.NET 8中,你可以使用相应的SDK来简化这个过程。以下是一个大致的步骤指南:

设置Box API和Amazon S3的访问权限:

• 获取Box API的Client ID、Client Secret和Access Token(或使用OAuth流程获取)。

• 配置Amazon S3的Access Key、Secret Key、Bucket名称等。

下载Box企业网盘里的文件:

• 使用Box API SDK下载文件。你可能需要先列出文件,找到你想要下载的那个,然后获取其下载URL或使用API直接下载。

将文件上传到Amazon S3:

• 使用Amazon S3 API SDK将下载的文件上传到指定的Bucket中。

错误处理和日志记录:

• 在整个过程中添加适当的错误处理和日志记录,以便在出现问题时能够追踪和调试。

以下是一个简化C#代码示例,展示了如何使用Box API SDK下载文件并使用Amazon S3 API SDK上传文件(注意:你需要安装Box.V2和AWSSDK.S3的NuGet包):

using Box.V2; using Box.V2.Auth; using Box.V2.Models; using Amazon; using Amazon.S3; using Amazon.S3.Model; using Amazon.S3.Transfer; using System; using System.IO; using System.Threading.Tasks; class Program { static async Task Main(string[] args) { // Box API 配置 string boxClientId = "your-box-client-id"; string boxClientSecret = "your-box-client-secret"; string boxAccessToken = "your-box-access-token"; // 或者使用OAuth流程获取 // Amazon S3 配置 string awsAccessKey = "your-aws-access-key"; string awsSecretKey = "your-aws-secret-key"; string bucketName = "your-s3-bucket-name"; string s3Region = RegionEndpoint.USEast1.SystemName; // 根据你的Bucket区域设置 // Box API 初始化 var boxConfig = new BoxConfig(boxClientId, boxClientSecret, new Uri(" api.box /2.0/")); var boxSession = new OAuthSession(boxConfig, new Uri(" account.box /api/oauth2/authorize"), boxAccessToken); var boxClient = new BoxClient(boxConfig, boxSession); // 查找并下载Box文件(这里假设你已经知道文件ID) string fileId = "your-box-file-id"; var fileRequest = new BoxFilesRequest(fileId); var fileInfo = await boxClient.FilesManager.GetInformationAsync(fileRequest); // 下载文件到本地临时路径 string tempFilePath = Path.GetTempFileName(); using (var client = new System.Net.Http.HttpClient()) { var downloadResponse = await client.GetAsync(fileInfo.DownloadUrl); downloadResponse.EnsureSuccessStatusCode(); using (var fs = new FileStream(tempFilePath, FileMode.Create, FileAccess.Write)) { await downloadResponse.Content.CopyToAsync(fs); } } // Amazon S3 初始化 var s3Client = new AmazonS3Client(awsAccessKey, awsSecretKey, RegionEndpoint.GetBySystemName(s3Region)); // 上传文件到S3 string s3Key = "your-s3-key"; // 文件在S3中的名称 TransferUtility transferUtility = new TransferUtility(s3Client); transferUtility.Upload(tempFilePath, bucketName, s3Key); // 清理临时文件 File.Delete(tempFilePath); Console.WriteLine("文件已成功从Box上传到S3!"); } }

注意:

• 上面的代码示例是为了说明流程而简化的,它没有处理所有可能的错误情况。

• 在实际部署中,你应该避免在代码中硬编码敏感信息(如API密钥和访问令牌),而是使用环境变量或安全的密钥管理服务。

• 你可能需要根据Box API和Amazon S3 API的最新文档来调整代码。

• Box文件的下载URL可能需要额外的处理,特别是如果它是通过OAuth流程获取的预签名URL。

• 上面的代码使用了TransferUtility来简化S3上传过程,但你也可以使用PutObjectRequest来更细粒度地控制上传过程。

完整解决方案包含异步处理、异常处理、日志记录及INI配置文件读取:

using Box.V2; using Box.V2.Auth; using Box.V2.Config; using Box.V2.Models; using Amazon.S3; using Amazon.S3.Transfer; using Nett; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks; // 配置文件结构 public class AppConfig { public BoxSettings Box { get; set; } public S3Settings S3 { get; set; } public RedshiftSettings Redshift { get; set; } } public class BoxSettings { public string ClientId { get; set; } public string ClientSecret { get; set; } public string AccessToken { get; set; } public string FolderId { get; set; } } public class S3Settings { public string AccessKey { get; set; } public string SecretKey { get; set; } public string BucketName { get; set; } public string Region { get; set; } public string TargetPath { get; set; } } public class RedshiftSettings { public string ConnectionString { get; set; } public string TableName { get; set; } public string IamRoleArn { get; set; } } public static class Logger { private static readonly object _lock = new(); public static void Log(string message, string level = "INFO") { var logEntry = $"{DateTime.Now:yyyy-MM-dd HH:mm:ss} [{level}] {message}"; lock (_lock) { File.AppendAllText("transfer.log", logEntry + Environment.NewLine); } } } public class BoxS3RedshiftService { private readonly AppConfig _config; private readonly BoxClient _boxClient; private readonly IAmazonS3 _s3Client; private readonly SemaphoreSlim _semaphore = new(5); public BoxS3RedshiftService(AppConfig config) { _config = config; _boxClient = InitializeBoxClient(config.Box); _s3Client = InitializeS3Client(config.S3); } private BoxClient InitializeBoxClient(BoxSettings settings) { var config = new BoxConfig(settings.ClientId, settings.ClientSecret, new Uri(" api.box /2.0/")); var session = new OAuthSession(config, new Uri(" account.box /api/oauth2/authorize"), settings.AccessToken); return new BoxClient(config, session); } private IAmazonS3 InitializeS3Client(S3Settings settings) { var region = Amazon.RegionEndpoint.GetBySystemName(settings.Region); return new AmazonS3Client(settings.AccessKey, settings.SecretKey, region); } public async Task ProcessFilesAsync() { try { Logger.Log("开始获取Box文件列表"); var files = await GetBoxFilesRecursiveAsync(_config.Box.FolderId); Logger.Log($"找到{files.Count}个待处理文件"); var tasks = files.Select(async file => { await _semaphore.WaitAsync(); try { await ProcessSingleFileAsync(file); } finally { _semaphore.Release(); } }); await Task.WhenAll(tasks); Logger.Log("所有文件处理完成"); } catch (Exception ex) { Logger.Log($"处理过程中发生全局异常: {ex}", "ERROR"); } } private async Task<List<BoxItem>> GetBoxFilesRecursiveAsync(string folderId) { var result = new List<BoxItem>(); var items = await _boxClient.FoldersManager.GetFolderItemsAsync(folderId, 1000); foreach (var item in items.Entries) { if (item.Type == "file" && item.Name.EndsWith(".gz")) { result.Add(item); } else if (item.Type == "folder") { result.AddRange(await GetBoxFilesRecursiveAsync(item.Id)); } } return result; } private async Task ProcessSingleFileAsync(BoxItem file) { string tempPath = null; try { // 下载文件 tempPath = await DownloadBoxFileAsync(file.Id); Logger.Log($"下载完成: {file.Name}"); // 上传S3 var s3Key = $"{_config.S3.TargetPath}/{file.Name}"; await UploadToS3Async(tempPath, s3Key); Logger.Log($"上传完成: {s3Key}"); // 导入Redshift await ImportToRedshiftAsync(s3Key); Logger.Log($"Redshift导入完成: {s3Key}"); } catch (Exception ex) { Logger.Log($"处理文件{file.Name}失败: {ex}", "ERROR"); } finally { if (File.Exists(tempPath)) { File.Delete(tempPath); } } } private async Task<string> DownloadBoxFileAsync(string fileId) { var tempPath = Path.GetTempFileName(); using var stream = await _boxClient.FilesManager.DownloadStreamAsync(fileId); using var fileStream = File.Create(tempPath); await stream.CopyToAsync(fileStream); return tempPath; } private async Task UploadToS3Async(string localPath, string s3Key) { var transferUtility = new TransferUtility(_s3Client); await transferUtility.UploadAsync(localPath, _config.S3.BucketName, s3Key); } private async Task ImportToRedshiftAsync(string s3Key) { using var conn = new Npgsql.NpgsqlConnection(_config.Redshift.ConnectionString); await conn.OpenAsync(); var copyCommand = $@"COPY {_config.Redshift.TableName} FROM 's3://{_config.S3.BucketName}/{s3Key}' CREDENTIALS 'aws_iam_role={_config.Redshift.IamRoleArn}' GZIP COMPUPDATE OFF STATUPDATE OFF"; using var cmd = new Npgsql.NpgsqlCommand(copyCommand, conn); await cmd.ExecuteNonQueryAsync(); } } class Program { static async Task Main(string[] args) { // 读取配置文件 var config = Toml.ReadFile<AppConfig>("config.toml"); // 初始化并执行服务 var service = new BoxS3RedshiftService(config); await service.ProcessFilesAsync(); } }

配套配置文件 (config.toml):

[Box] ClientId = "your_box_client_id" ClientSecret = "your_box_client_secret" AccessToken = "your_box_access_token" FolderId = "your_box_folder_id" [S3] AccessKey = "your_aws_access_key" SecretKey = "your_aws_secret_key" BucketName = "your_bucket_name" Region = "us-east-1" TargetPath = "imports/daily" [Redshift] ConnectionString = "Host=your-cluster.xxxxx.redshift.amazonaws ;Database=your_db;User Id=user;Password=password;" TableName = "sales_data" IamRoleArn = "arn:aws:iam::123456789012:role/RedshiftImportRole"

实现说明:

异步处理:

使用async/await进行异步编程通过SemaphoreSlim控制并发度(最大5个并行任务)使用Task.WhenAll等待所有文件处理完成

异常处理:

每个文件处理过程有独立try-catch全局异常处理包裹主流程使用finally块确保资源清理

日志记录:

线程安全的日志记录机制记录时间戳、日志级别和操作详情日志文件自动追加模式

配置管理:

使用TOML格式配置文件(需安装Nett包)支持Box、S3和Redshift的配置分离敏感信息不硬编码在代码中

扩展功能:

递归遍历Box目录自动清理临时文件Redshift COPY命令参数配置GZIP压缩文件自动识别

使用步骤:

安装依赖包:

dotnet add package Box.V2 dotnet add package AWSSDK.S3 dotnet add package Npgsql dotnet add package Nett

创建TOML配置文件

运行程序:

dotnet run

增强建议:

增加重试机制(使用Polly等重试库)添加进度报告功能实现Box Token自动刷新添加文件校验(MD5校验和)支持配置文件加密添加邮件/短信通知功能实现断点续传功能
标签:

C#将Box企业网盘里的文件批量上载到S3,并导入Redshift由讯客互联IT业界栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“C#将Box企业网盘里的文件批量上载到S3,并导入Redshift