【C#】通过Thrift操作HBase系列

奋斗吧
奋斗吧
擅长邻域:未填写

标签: 【C#】通过Thrift操作HBase系列 JavaScript博客 51CTO博客

2023-07-24 18:24:47 93浏览

【C#】通过Thrift操作HBase系列,题外话:C# 调用Java的几种方法1.将Java端的


题外话:C#  调用 Java 的几种方法

1.将Java端的接口通过WebService方式发布,C#可以方便的调用

2.先使用C++ 通过 JNI 调用 Java,C# 调用C++的接口

3.使用开源的库直接使用C#调用Java ,详细信息请点击

4.使用IKVM实现C#调用Java,参考:http://www.ikvm.net/

 

之所以说这些,是因为自己这边客户端要调用HBase接口(Java实现),刚开始我是使用WS方式实现调用,这种方式很简单,而且通用性好。之后一段时间发现了上面所说的第三种方式,并成功调用,但是写这个库的哥们,好像没有维护自己写的这个库,里面有几个很明显的BUG,而且在循环调用的时间,会报内存错误,由于对JNI不太熟悉,也就放弃了这种方式,如果对这种方式感兴趣的童鞋可以给他完善一下,再提个醒,这个开源库依赖jvm.dll,只有32位的JavaJDK才行。至于第二种和第四种方式没有深入研究,在这也就不说了。

 

最终我并没有采用上面的任何一种方式,而我采用的是Thrift方式,虽然比Java API 慢一点,但也在可接受的范围之内。接下来就要进入正题了:

 

准备阶段:

1. 下载 Thrift 的源代码包,http://thrift.apache.org/

2. 下载 Thrift compiler for Windows ,http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.0/thrift-0.9.0.exe

生成Thrfit接口类:

1. 从HBase包中,得到HBase.Thrift文件。(..\hbase-0.94.6.1\src\main\resources\org\apache\hadoop\hbase\thrift 在此目录下)

2. 将Thrift-0.9.0.exe 与 HBase.Thrift文件放到同意目录下(当然也可以不在同一目录)

3. 进入命令行, Thrift-0.9.0.exe -gen CSharp HBase.Thrift此目录下就成了名为gen-csharp的文件夹

构建解决方案

该准备的项目代码都已完成,新建VS Project , 将Thrift 的源代码项目与刚刚生成的接口接口类引入。

【C#】通过Thrift操作HBase系列_Hbase

开始集群的Thrift服务

hbase-daemon.sh start thrift   端口号默认9090

编写测试代码

【C#】通过Thrift操作HBase系列_List_02

通过HBase.Thrift 所生成的接口类中,其中Hbase.cs为核心类,所有客户端操作HBase的接口的定义和实现都在此类中,如果想查看Thrift服务端的代码,请参考HBase源代码。

以下是类中所定义的查询接口:

 



1. List<TRowResult> getRow(byte[] tableName, byte[] row, Dictionary<byte[], byte[]> attributes);  
2. List<TRowResult> getRowWithColumns(byte[] tableName, byte[] row, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);  
3. List<TRowResult> getRowTs(byte[] tableName, byte[] row, long timestamp, Dictionary<byte[], byte[]> attributes);  
4. List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);  
5. List<TRowResult> getRows(byte[] tableName, List<byte[]> rows, Dictionary<byte[], byte[]> attributes);  
6. List<TRowResult> getRowsWithColumns(byte[] tableName, List<byte[]> rows, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);  
7. List<TRowResult> getRowsTs(byte[] tableName, List<byte[]> rows, long timestamp, Dictionary<byte[], byte[]> attributes);  
8. List<TRowResult> getRowsWithColumnsTs(byte[] tableName, List<byte[]> rows, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);  
9. int scannerOpenWithScan(byte[] tableName, TScan scan, Dictionary<byte[], byte[]> attributes);  
10. int scannerOpen(byte[] tableName, byte[] startRow, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);  
11. int scannerOpenWithStop(byte[] tableName, byte[] startRow, byte[] stopRow, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);  
12. int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);  
13. int scannerOpenTs(byte[] tableName, byte[] startRow, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);  
14. int scannerOpenWithStopTs(byte[] tableName, byte[] startRow, byte[] stopRow, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);  
15. List<TRowResult> scannerGet(int id);  
16. List<TRowResult> scannerGetList(int id, int nbRows);  
17. void scannerClose(int id);

 

结合项目的应用介绍几个比较常用的接口(其实只要用过HBaseAPI的童鞋,上面这些接口就不在话下了):

1.getRow(这类查询简单,通过rowkey获取数据,接口的大部分参数类型为字节数组)

【C#】通过Thrift操作HBase系列_Hbase_03

结果:

【C#】通过Thrift操作HBase系列_Thrift_04

2.scannerOpenWithStop(通过RowKey的范围获取数据)

 



1.         /// <summary>  
2. /// 通过RowKey的范围获取数据  
3. /// </summary>  
4. /// <param name="tablename"></param>  
5. /// <param name="stRowkey"></param>  
6. /// <param name="?"></param>  
7. /// <remarks>结果集包含StartRowKey列值,不包含EndRowKey的列值</remarks>  
8. static void GetDataFromHBaseThroughRowKeyRange(string tablename,  
9. string stRowkey,string endRowkey)  
10.         {  
11.             transport.Open();  
12. int ScannerID = client.scannerOpenWithStop(Encoding.UTF8.GetBytes(tablename),   
13.                 Encoding.UTF8.GetBytes(stRowkey), Encoding.UTF8.GetBytes(endRowkey),   
14. new List<byte[]> { Encoding.UTF8.GetBytes("i:Data") }, null);  
15.   
16.             List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);  
17. foreach (var key in reslut)  
18.             {  
19.                 Console.WriteLine(Encoding.UTF8.GetString(key.Row));  
20.   
21. foreach (var k in key.Columns)  
22.                 {  
23. "\t");  
24.                     Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));  
25. "++++++++++++++++++++++++++++++++++++++");  
26.                 }  
27.             }  
28.         }  
29.   
30. //调用  
31. static void Main(string[] args)  
32.         {  
33. "HStudy", "001", "006");  
34.         }


结果:

【C#】通过Thrift操作HBase系列_List_05

3.scannerOpenWithPrefix(通过RowKey的前缀进行匹配查询)

 



1. /// <summary>  
2. /// 通过Rowkey前缀Fliter  
3. /// </summary>  
4. /// <param name="tablename"></param>  
5. /// <param name="startrowkey"></param>  
6. /// <param name="endrowkey"></param>  
7. static void GetDataFromHBaseThroughRowKeyPrefix(string tablename, string Prefixrowkey)  
8. {  
9.     transport.Open();  
10. int ScannerID = client.scannerOpenWithPrefix(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(Prefixrowkey), new List<byte[]> { Encoding.UTF8.GetBytes("i:Data") }, null);  
11. /*
12.     *  scannerGetList(string ID),源码中其实调用scannerGetList(string ID,int nbRow)方法,nbRow传值为1
13.     */  
14.     List<TRowResult> reslut = client.scannerGetList(ScannerID,100);             
15. foreach (var key in reslut)  
16.     {  
17.         Console.WriteLine(Encoding.UTF8.GetString(key.Row));  
18.   
19. foreach (var k in key.Columns)  
20.         {  
21. "\t");  
22.             Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));  
23. "++++++++++++++++++++++++++++++++++++++");  
24.         }  
25.     }  
26. }  
27. /调用  
28. static void Main(string[] args)  
29. bsp;      {  
30. "HStudy", "00");  
31. }

 

4.scannerOpenWithScan(通过过滤器进行查询)

这个接口是所有查询接口中最麻烦的一个吧,因为它用到了过滤器,也就是HBaseAPI中的Filter。这个接口的参数中有一个参数类型为TScan,基本结构如下:

 



1. public partial class TScan : TBase  
2.    {  
3. private byte[] _startRow;  
4. private byte[] _stopRow;  
5. private long _timestamp;  
6. private List<byte[]> _columns;  
7. private int _caching;  
8. private byte[] _filterString;  
9.    }

 

 

前面的几个参数不多说,这里说一下_filterString (关于HaseAPI中各种Filter这里就不多说),以常见的SingleColumnValueFilter为例,如果我想定义一个查询PatientName为小红的一个过滤器:

            stringfilterString = "SingleColumnValueFilter('s','PatientName',=,'substring:小红')";

            byte[]_filterString = Encoding.UTF8.GetBytes(filterString);

如果要定义多个过滤器,过滤器之间用‘AND’连接。

 



1. /// <summary>  
2. /// 通过Filter进行数据的Scanner  
3. /// </summary>  
4. /// <param name="tablename"></param>  
5. /// <param name="filterString"></param>  
6. static void GetDataFromHBaseThroughFilter(string tablename, string filterString,List<byte[]> _cols)  
7. {  
8. new TScan();  
9. //SingleColumnValueFilter('i', 'Data', =, '2')  
10.     _scan.FilterString =Encoding.UTF8.GetBytes(filterString);  
11.     _scan.Columns = _cols;  
12.     transport.Open();  
13. int ScannerID = client.scannerOpenWithScan(Encoding.UTF8.GetBytes(tablename), _scan,null);  
14.   
15.     List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);  
16. foreach (var key in reslut)  
17.     {  
18.         Console.WriteLine(Encoding.UTF8.GetString(key.Row));  
19.   
20. foreach (var k in key.Columns)  
21.         {  
22. "\t");  
23.             Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));  
24. "++++++++++++++++++++++++++++++++++++++");  
25.         }  
26.     }  
27. }  
28.   
29.   
30. static void Main(string[] args)  
31. {  
32. GetDataFromHBaseThroughRowKeyRange("HImages", "123.456.1", "123.456.9");  
33. byte[]> _byte = new List<byte[]>();  
34. "s:PatientName"));  
35. "s:StudyInstanceUID"));  
36. "s:PatientSex"));  
37.   
38. string filterString = "((SingleColumnValueFilter('s','PatientName',=,'substring:Jim')) AND (SingleColumnValueFilter('s','PatientSex',=,'substring:10')))";  
39. string filterString = "SingleColumnValueFilter('s','PatientName',=,'substring:小红')";             
40. "HStudy", filterString, _byte);  
41.     Console.ReadLine();  
42.   
43. }


说一下Thrift中使用很平凡的API(新建表,删除表,插入数据,更新数据,删除数据),最后发一下,为了方便使用Thrift写的一个Helper类。

 

 



1. void createTable(byte[] tableName, List<ColumnDescriptor> columnFamilies);  
2. void mutateRow(byte[] tableName, byte[] row, List<Mutation> mutations, Dictionary<byte[], byte[]> attributes);  
3. void mutateRowTs(byte[] tableName, byte[] row, List<Mutation> mutations, long timestamp, Dictionary<byte[], byte[]> attributes);  
4. void mutateRows(byte[] tableName, List<BatchMutation> rowBatches, Dictionary<byte[], byte[]> attributes);  
5. void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp, Dictionary<byte[], byte[]> attributes);  
6. void deleteTable(byte[] tableName);  
7. void deleteAll(byte[] tableName, byte[] row, byte[] column, Dictionary<byte[], byte[]> attributes);  
8. void deleteAllTs(byte[] tableName, byte[] row, byte[] column, long timestamp, Dictionary<byte[], byte[]> attributes);  
9. void deleteAllRow(byte[] tableName, byte[] row, Dictionary<byte[], byte[]> attributes);  
10. void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp, Dictionary<byte[], byte[]> attributes);



值得注意的是,Thrift 插入行和更新行使用的同一函数(mutateRow等一类函数),使用过HBaseAPI的童鞋,这点不足为奇。这几个API都比较简单,下面我就直接贴出Helper类及简单的测试类。

 


1. using System;  
2. using System.Collections.Generic;  
3. using System.Linq;  
4. using System.Text;  
5. using System.Threading.Tasks;  
6. using ThriftHelper;  
7. using IThrift;  
8. using Thrift.Transport;  
9. using Thrift.Protocol;  
10. namespace Test  
11. {  
12. class Program  
13.     {  
14. /*
15.          *   表名: HTest
16.          *   列簇: i
17.          *   子列: Data
18.          */  
19. static void Main(string[] args)  
20.         {  
21.             #region Test  
22.   
23.             Helper.Open();  
24. "CreateTable:");  
25. new ColumnDescriptor();  
26. "i");  
27.   
28. if (Helper.CreateTable("ITest", new List<ColumnDescriptor> { _cd }))  
29. "CreateTable is Success");  
30. else  
31. "CreateTable Occurred Error");  
32.   
33. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");  
34. "MutateRowHBase:");  
35. new Mutation();  
36. "i:one");  
37. "1");  
38.   
39. if (Helper.MutateRowHBase("ITest", "001", new List<Mutation> { _mutation }))  
40. "MutateRowHBase is Success");  
41. else  
42. "MutateRowHBase Occurred Error");  
43.   
44. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");  
45.   
46. "GetDataFromHBase:");  
47. "ITest", "001");  
48.             Printer(_result);  
49.   
50. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");  
51.   
52. "MutateRowHBase:");  
53. new Mutation();  
54. "i:one");  
55. "-1");  
56.   
57. if (Helper.MutateRowHBase("ITest", "001", new List<Mutation> { _mutation }))  
58. "MutateRowHBase is Success");  
59. else  
60. "MutateRowHBase Occurred Error");  
61.   
62. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");  
63.   
64. "GetDataFromHBase:");  
65. "ITest", "001");  
66.             Printer(_result);  
67.   
68. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");  
69.   
70. "DeleteRow:");  
71. if (Helper.DeleteAllRow("ITest", "001"))  
72. "DeleteAllRow is Success");  
73. else  
74. "DeleteAllRow Occurred Error");  
75.             Helper.Close();  
76.  
77.  
78.             #endregion  
79.   
80.             Console.ReadKey();  
81.              
82.         }  
83.   
84. static void Printer(List<TRowResult> reslut)  
85.         {  
86. if (reslut.Count == 0)  
87. return;  
88. foreach (var key in reslut)  
89.             {  
90.                 Console.WriteLine(Encoding.UTF8.GetString(key.Row));  
91.   
92. foreach (var k in key.Columns)  
93.                 {  
94. "\t");  
95.                     Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));  
96. "++++++++++++++++++++++++++++++++++++++");  
97.                 }  
98.             }  
99.         }  
100. static void Printer(string conent)  
101.         {  
102.             Console.Write(conent);  
103.         }  
104.     }  
105. }


 

 

Helper类:

 

 


1. using System;  
2. using System.Collections.Generic;  
3. using System.Linq;  
4. using System.Text;  
5. using System.Threading.Tasks;  
6. using Thrift;  
7. using IThrift;  
8. using Thrift.Transport;  
9. using Thrift.Protocol;  
10.   
11. namespace ThriftHelper  
12. {  
13. public static class Helper  
14.     {  
15. static TTransport transport = new TSocket("192.168.2.200", 9090);  
16. static TProtocol tProtocol = new TBinaryProtocol(transport);  
17. static Hbase.Client client = new Hbase.Client(tProtocol);  
18.   
19. public static void Open()  
20.         {  
21.             transport.Open();  
22.         }  
23.   
24. public static void Close()  
25.         {  
26.             transport.Close();  
27.         }  
28.   
29. /// <summary>  
30. /// 通过rowkey获取数据  
31. /// </summary>  
32. /// <param name="tablename"></param>  
33. /// <param name="rowkey"></param>  
34. public static List<TRowResult> GetDataFromHBase(string tablename, string rowkey)  
35.         {  
36. null);  
37. return reslut;  
38.         }  
39.   
40. /// <summary>  
41. /// 通过Rowkey前缀Fliter  
42. /// </summary>  
43. /// <param name="tablename"></param>  
44. /// <param name="startrowkey"></param>  
45. /// <param name="endrowkey"></param>  
46. public static List<TRowResult> GetDataFromHBaseThroughRowKeyPrefix(string tablename, string Prefixrowkey,List<string> _cols)  
47.         {  
48. byte[]> _bytes = new List<byte[]>();  
49. foreach (string str in _cols)  
50.                 _bytes.Add(Encoding.UTF8.GetBytes(str));  
51.   
52.               
53. int ScannerID = client.scannerOpenWithPrefix(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(Prefixrowkey),  
54. null);  
55. /*
56.             *  scannerGetList(string ID),源码中其实调用scannerGetList(string ID,int nbRow)方法,nbRow传值为1
57.             */  
58.             List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);  
59. return reslut;  
60.         }  
61.   
62. /// <summary>  
63. /// 通过RowKey的范围获取数据  
64. /// </summary>  
65. /// <param name="tablename"></param>  
66. /// <param name="stRowkey"></param>  
67. /// <param name="?"></param>  
68. /// <remarks>结果集包含StartRowKey列值,不包含EndRowKey的列值</remarks>  
69. public static List<TRowResult> GetDataFromHBaseThroughRowKeyRange(string tablename,  
70. string stRowkey, string endRowkey,List<string> _cols)  
71.         {  
72. byte[]> _bytes = new List<byte[]>();  
73. foreach (string str in _cols)  
74.                 _bytes.Add(Encoding.UTF8.GetBytes(str));  
75.   
76.               
77. int ScannerID = client.scannerOpenWithStop(Encoding.UTF8.GetBytes(tablename),  
78.                 Encoding.UTF8.GetBytes(stRowkey), Encoding.UTF8.GetBytes(endRowkey),  
79. null);  
80.   
81.             List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);  
82. return reslut;  
83.         }  
84.   
85. /// <summary>  
86. /// 通过Filter进行数据的Scanner  
87. /// </summary>  
88. /// <param name="tablename"></param>  
89. /// <param name="filterString"></param>  
90. public static List<TRowResult> GetDataFromHBaseThroughFilter(string tablename, string filterString, List<byte[]> _cols)  
91.         {  
92. new TScan();  
93. //SingleColumnValueFilter('i', 'Data', =, '2')  
94.             _scan.FilterString = Encoding.UTF8.GetBytes(filterString);  
95.             _scan.Columns = _cols;  
96.               
97. int ScannerID = client.scannerOpenWithScan(Encoding.UTF8.GetBytes(tablename), _scan, null);  
98.   
99.             List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);  
100. return reslut;  
101.         }  
102.   
103. public static bool MutateRowHBase(string tablename, string rowkey, List<Mutation> _mutations)  
104.         {  
105. try  
106.             {  
107. null);  
108. return true;  
109.             }  
110. catch (Exception e)  
111.             {  
112. return false;  
113.             }  
114.         }  
115.   
116. public static bool MutateRowsHBase(string tablename, List<BatchMutation> _BatchMutation)  
117.         {  
118. try  
119.             {  
120. null);  
121. return true;  
122.             }  
123. catch (Exception e)  
124.             {  
125.   
126. return false;  
127.             }  
128.   
129.         }  
130.   
131. public static bool DeleteRowHBase(string tablename, string rowkey, string column)  
132.         {  
133. try  
134.             {  
135.                 client.deleteAll(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(rowkey),  
136. null);  
137. return true;  
138.             }  
139. catch (Exception e)  
140.             {  
141.   
142. return false;  
143.             }  
144.   
145.         }  
146.   
147. public static bool DeleteAllRow(string tablename, string rowkey)  
148.         {  
149. try  
150.             {  
151. null);  
152. return true;  
153.             }  
154. catch (Exception e)  
155.             {  
156. return false;  
157.             }  
158.              
159.         }  
160.   
161. public static bool DeleteTable(string tablename)  
162.         {  
163. try  
164.             {  
165.                 client.deleteTable(Encoding.UTF8.GetBytes(tablename));  
166. return true;  
167.             }  
168. catch (Exception e)  
169.             {  
170. return false;  
171.             }  
172.               
173.         }  
174.   
175. public static bool CreateTable(string tablename, List<ColumnDescriptor> _cols)  
176.         {  
177. try  
178.             {  
179.                 client.createTable(Encoding.UTF8.GetBytes(tablename), _cols);  
180. return true;  
181.             }  
182. catch (Exception e)  
183.             {  
184. return false;  
185.             }  
186.               
187.         }  
188.     }  
189. }



好了,关于Thrift的基本操作就写到这,当然Thrift也支持Hbase中比较高级的操作,在以后的博客会不断更新。谢谢大家,个人水平有限,不足之处请谅解。

 

 

好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695