【C#】通过Thrift操作HBase系列
标签: 【C#】通过Thrift操作HBase系列 JavaScript博客 51CTO博客
2023-07-24 18:24:47 93浏览
题外话:C# 调用 Java 的几种方法
1.将Java端的接口通过WebService方式发布,C#可以方便的调用
2.先使用C++ 通过 JNI 调用 Java,C# 调用C++的接口
3.使用开源的库直接使用C#调用Java ,详细信息请点击
4.使用IKVM实现C#调用Java,参考:http://www.ikvm.net/
之所以说这些,是因为自己这边客户端要调用HBase接口(Java实现),刚开始我是使用WS方式实现调用,这种方式很简单,而且通用性好。之后一段时间发现了上面所说的第三种方式,并成功调用,但是写这个库的哥们,好像没有维护自己写的这个库,里面有几个很明显的BUG,而且在循环调用的时间,会报内存错误,由于对JNI不太熟悉,也就放弃了这种方式,如果对这种方式感兴趣的童鞋可以给他完善一下,再提个醒,这个开源库依赖jvm.dll,只有32位的JavaJDK才行。至于第二种和第四种方式没有深入研究,在这也就不说了。
最终我并没有采用上面的任何一种方式,而我采用的是Thrift方式,虽然比Java API 慢一点,但也在可接受的范围之内。接下来就要进入正题了:
准备阶段:
1. 下载 Thrift 的源代码包,http://thrift.apache.org/
2. 下载 Thrift compiler for Windows ,http://www.apache.org/dyn/closer.cgi?path=/thrift/0.9.0/thrift-0.9.0.exe
生成Thrfit接口类:
1. 从HBase包中,得到HBase.Thrift文件。(..\hbase-0.94.6.1\src\main\resources\org\apache\hadoop\hbase\thrift 在此目录下)
2. 将Thrift-0.9.0.exe 与 HBase.Thrift文件放到同意目录下(当然也可以不在同一目录)
3. 进入命令行, Thrift-0.9.0.exe -gen CSharp HBase.Thrift此目录下就成了名为gen-csharp的文件夹
构建解决方案
该准备的项目代码都已完成,新建VS Project , 将Thrift 的源代码项目与刚刚生成的接口接口类引入。
开始集群的Thrift服务
hbase-daemon.sh start thrift 端口号默认9090
编写测试代码
通过HBase.Thrift 所生成的接口类中,其中Hbase.cs为核心类,所有客户端操作HBase的接口的定义和实现都在此类中,如果想查看Thrift服务端的代码,请参考HBase源代码。
以下是类中所定义的查询接口:
1. List<TRowResult> getRow(byte[] tableName, byte[] row, Dictionary<byte[], byte[]> attributes);
2. List<TRowResult> getRowWithColumns(byte[] tableName, byte[] row, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);
3. List<TRowResult> getRowTs(byte[] tableName, byte[] row, long timestamp, Dictionary<byte[], byte[]> attributes);
4. List<TRowResult> getRowWithColumnsTs(byte[] tableName, byte[] row, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);
5. List<TRowResult> getRows(byte[] tableName, List<byte[]> rows, Dictionary<byte[], byte[]> attributes);
6. List<TRowResult> getRowsWithColumns(byte[] tableName, List<byte[]> rows, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);
7. List<TRowResult> getRowsTs(byte[] tableName, List<byte[]> rows, long timestamp, Dictionary<byte[], byte[]> attributes);
8. List<TRowResult> getRowsWithColumnsTs(byte[] tableName, List<byte[]> rows, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);
9. int scannerOpenWithScan(byte[] tableName, TScan scan, Dictionary<byte[], byte[]> attributes);
10. int scannerOpen(byte[] tableName, byte[] startRow, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);
11. int scannerOpenWithStop(byte[] tableName, byte[] startRow, byte[] stopRow, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);
12. int scannerOpenWithPrefix(byte[] tableName, byte[] startAndPrefix, List<byte[]> columns, Dictionary<byte[], byte[]> attributes);
13. int scannerOpenTs(byte[] tableName, byte[] startRow, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);
14. int scannerOpenWithStopTs(byte[] tableName, byte[] startRow, byte[] stopRow, List<byte[]> columns, long timestamp, Dictionary<byte[], byte[]> attributes);
15. List<TRowResult> scannerGet(int id);
16. List<TRowResult> scannerGetList(int id, int nbRows);
17. void scannerClose(int id);
结合项目的应用介绍几个比较常用的接口(其实只要用过HBaseAPI的童鞋,上面这些接口就不在话下了):
1.getRow(这类查询简单,通过rowkey获取数据,接口的大部分参数类型为字节数组)
结果:
2.scannerOpenWithStop(通过RowKey的范围获取数据)
1. /// <summary>
2. /// 通过RowKey的范围获取数据
3. /// </summary>
4. /// <param name="tablename"></param>
5. /// <param name="stRowkey"></param>
6. /// <param name="?"></param>
7. /// <remarks>结果集包含StartRowKey列值,不包含EndRowKey的列值</remarks>
8. static void GetDataFromHBaseThroughRowKeyRange(string tablename,
9. string stRowkey,string endRowkey)
10. {
11. transport.Open();
12. int ScannerID = client.scannerOpenWithStop(Encoding.UTF8.GetBytes(tablename),
13. Encoding.UTF8.GetBytes(stRowkey), Encoding.UTF8.GetBytes(endRowkey),
14. new List<byte[]> { Encoding.UTF8.GetBytes("i:Data") }, null);
15.
16. List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);
17. foreach (var key in reslut)
18. {
19. Console.WriteLine(Encoding.UTF8.GetString(key.Row));
20.
21. foreach (var k in key.Columns)
22. {
23. "\t");
24. Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));
25. "++++++++++++++++++++++++++++++++++++++");
26. }
27. }
28. }
29.
30. //调用
31. static void Main(string[] args)
32. {
33. "HStudy", "001", "006");
34. }
结果:
3.scannerOpenWithPrefix(通过RowKey的前缀进行匹配查询)
1. /// <summary>
2. /// 通过Rowkey前缀Fliter
3. /// </summary>
4. /// <param name="tablename"></param>
5. /// <param name="startrowkey"></param>
6. /// <param name="endrowkey"></param>
7. static void GetDataFromHBaseThroughRowKeyPrefix(string tablename, string Prefixrowkey)
8. {
9. transport.Open();
10. int ScannerID = client.scannerOpenWithPrefix(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(Prefixrowkey), new List<byte[]> { Encoding.UTF8.GetBytes("i:Data") }, null);
11. /*
12. * scannerGetList(string ID),源码中其实调用scannerGetList(string ID,int nbRow)方法,nbRow传值为1
13. */
14. List<TRowResult> reslut = client.scannerGetList(ScannerID,100);
15. foreach (var key in reslut)
16. {
17. Console.WriteLine(Encoding.UTF8.GetString(key.Row));
18.
19. foreach (var k in key.Columns)
20. {
21. "\t");
22. Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));
23. "++++++++++++++++++++++++++++++++++++++");
24. }
25. }
26. }
27. /调用
28. static void Main(string[] args)
29. bsp; {
30. "HStudy", "00");
31. }
4.scannerOpenWithScan(通过过滤器进行查询)
这个接口是所有查询接口中最麻烦的一个吧,因为它用到了过滤器,也就是HBaseAPI中的Filter。这个接口的参数中有一个参数类型为TScan,基本结构如下:
1. public partial class TScan : TBase
2. {
3. private byte[] _startRow;
4. private byte[] _stopRow;
5. private long _timestamp;
6. private List<byte[]> _columns;
7. private int _caching;
8. private byte[] _filterString;
9. }
前面的几个参数不多说,这里说一下_filterString (关于HaseAPI中各种Filter这里就不多说),以常见的SingleColumnValueFilter为例,如果我想定义一个查询PatientName为小红的一个过滤器:
stringfilterString = "SingleColumnValueFilter('s','PatientName',=,'substring:小红')";
byte[]_filterString = Encoding.UTF8.GetBytes(filterString);
如果要定义多个过滤器,过滤器之间用‘AND’连接。
1. /// <summary>
2. /// 通过Filter进行数据的Scanner
3. /// </summary>
4. /// <param name="tablename"></param>
5. /// <param name="filterString"></param>
6. static void GetDataFromHBaseThroughFilter(string tablename, string filterString,List<byte[]> _cols)
7. {
8. new TScan();
9. //SingleColumnValueFilter('i', 'Data', =, '2')
10. _scan.FilterString =Encoding.UTF8.GetBytes(filterString);
11. _scan.Columns = _cols;
12. transport.Open();
13. int ScannerID = client.scannerOpenWithScan(Encoding.UTF8.GetBytes(tablename), _scan,null);
14.
15. List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);
16. foreach (var key in reslut)
17. {
18. Console.WriteLine(Encoding.UTF8.GetString(key.Row));
19.
20. foreach (var k in key.Columns)
21. {
22. "\t");
23. Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));
24. "++++++++++++++++++++++++++++++++++++++");
25. }
26. }
27. }
28.
29.
30. static void Main(string[] args)
31. {
32. GetDataFromHBaseThroughRowKeyRange("HImages", "123.456.1", "123.456.9");
33. byte[]> _byte = new List<byte[]>();
34. "s:PatientName"));
35. "s:StudyInstanceUID"));
36. "s:PatientSex"));
37.
38. string filterString = "((SingleColumnValueFilter('s','PatientName',=,'substring:Jim')) AND (SingleColumnValueFilter('s','PatientSex',=,'substring:10')))";
39. string filterString = "SingleColumnValueFilter('s','PatientName',=,'substring:小红')";
40. "HStudy", filterString, _byte);
41. Console.ReadLine();
42.
43. }
说一下Thrift中使用很平凡的API(新建表,删除表,插入数据,更新数据,删除数据),最后发一下,为了方便使用Thrift写的一个Helper类。
1. void createTable(byte[] tableName, List<ColumnDescriptor> columnFamilies);
2. void mutateRow(byte[] tableName, byte[] row, List<Mutation> mutations, Dictionary<byte[], byte[]> attributes);
3. void mutateRowTs(byte[] tableName, byte[] row, List<Mutation> mutations, long timestamp, Dictionary<byte[], byte[]> attributes);
4. void mutateRows(byte[] tableName, List<BatchMutation> rowBatches, Dictionary<byte[], byte[]> attributes);
5. void mutateRowsTs(byte[] tableName, List<BatchMutation> rowBatches, long timestamp, Dictionary<byte[], byte[]> attributes);
6. void deleteTable(byte[] tableName);
7. void deleteAll(byte[] tableName, byte[] row, byte[] column, Dictionary<byte[], byte[]> attributes);
8. void deleteAllTs(byte[] tableName, byte[] row, byte[] column, long timestamp, Dictionary<byte[], byte[]> attributes);
9. void deleteAllRow(byte[] tableName, byte[] row, Dictionary<byte[], byte[]> attributes);
10. void deleteAllRowTs(byte[] tableName, byte[] row, long timestamp, Dictionary<byte[], byte[]> attributes);
值得注意的是,Thrift 插入行和更新行使用的同一函数(mutateRow等一类函数),使用过HBaseAPI的童鞋,这点不足为奇。这几个API都比较简单,下面我就直接贴出Helper类及简单的测试类。
1. using System;
2. using System.Collections.Generic;
3. using System.Linq;
4. using System.Text;
5. using System.Threading.Tasks;
6. using ThriftHelper;
7. using IThrift;
8. using Thrift.Transport;
9. using Thrift.Protocol;
10. namespace Test
11. {
12. class Program
13. {
14. /*
15. * 表名: HTest
16. * 列簇: i
17. * 子列: Data
18. */
19. static void Main(string[] args)
20. {
21. #region Test
22.
23. Helper.Open();
24. "CreateTable:");
25. new ColumnDescriptor();
26. "i");
27.
28. if (Helper.CreateTable("ITest", new List<ColumnDescriptor> { _cd }))
29. "CreateTable is Success");
30. else
31. "CreateTable Occurred Error");
32.
33. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
34. "MutateRowHBase:");
35. new Mutation();
36. "i:one");
37. "1");
38.
39. if (Helper.MutateRowHBase("ITest", "001", new List<Mutation> { _mutation }))
40. "MutateRowHBase is Success");
41. else
42. "MutateRowHBase Occurred Error");
43.
44. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
45.
46. "GetDataFromHBase:");
47. "ITest", "001");
48. Printer(_result);
49.
50. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
51.
52. "MutateRowHBase:");
53. new Mutation();
54. "i:one");
55. "-1");
56.
57. if (Helper.MutateRowHBase("ITest", "001", new List<Mutation> { _mutation }))
58. "MutateRowHBase is Success");
59. else
60. "MutateRowHBase Occurred Error");
61.
62. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
63.
64. "GetDataFromHBase:");
65. "ITest", "001");
66. Printer(_result);
67.
68. "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++");
69.
70. "DeleteRow:");
71. if (Helper.DeleteAllRow("ITest", "001"))
72. "DeleteAllRow is Success");
73. else
74. "DeleteAllRow Occurred Error");
75. Helper.Close();
76.
77.
78. #endregion
79.
80. Console.ReadKey();
81.
82. }
83.
84. static void Printer(List<TRowResult> reslut)
85. {
86. if (reslut.Count == 0)
87. return;
88. foreach (var key in reslut)
89. {
90. Console.WriteLine(Encoding.UTF8.GetString(key.Row));
91.
92. foreach (var k in key.Columns)
93. {
94. "\t");
95. Console.WriteLine(Encoding.UTF8.GetString(k.Value.Value));
96. "++++++++++++++++++++++++++++++++++++++");
97. }
98. }
99. }
100. static void Printer(string conent)
101. {
102. Console.Write(conent);
103. }
104. }
105. }
Helper类:
1. using System;
2. using System.Collections.Generic;
3. using System.Linq;
4. using System.Text;
5. using System.Threading.Tasks;
6. using Thrift;
7. using IThrift;
8. using Thrift.Transport;
9. using Thrift.Protocol;
10.
11. namespace ThriftHelper
12. {
13. public static class Helper
14. {
15. static TTransport transport = new TSocket("192.168.2.200", 9090);
16. static TProtocol tProtocol = new TBinaryProtocol(transport);
17. static Hbase.Client client = new Hbase.Client(tProtocol);
18.
19. public static void Open()
20. {
21. transport.Open();
22. }
23.
24. public static void Close()
25. {
26. transport.Close();
27. }
28.
29. /// <summary>
30. /// 通过rowkey获取数据
31. /// </summary>
32. /// <param name="tablename"></param>
33. /// <param name="rowkey"></param>
34. public static List<TRowResult> GetDataFromHBase(string tablename, string rowkey)
35. {
36. null);
37. return reslut;
38. }
39.
40. /// <summary>
41. /// 通过Rowkey前缀Fliter
42. /// </summary>
43. /// <param name="tablename"></param>
44. /// <param name="startrowkey"></param>
45. /// <param name="endrowkey"></param>
46. public static List<TRowResult> GetDataFromHBaseThroughRowKeyPrefix(string tablename, string Prefixrowkey,List<string> _cols)
47. {
48. byte[]> _bytes = new List<byte[]>();
49. foreach (string str in _cols)
50. _bytes.Add(Encoding.UTF8.GetBytes(str));
51.
52.
53. int ScannerID = client.scannerOpenWithPrefix(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(Prefixrowkey),
54. null);
55. /*
56. * scannerGetList(string ID),源码中其实调用scannerGetList(string ID,int nbRow)方法,nbRow传值为1
57. */
58. List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);
59. return reslut;
60. }
61.
62. /// <summary>
63. /// 通过RowKey的范围获取数据
64. /// </summary>
65. /// <param name="tablename"></param>
66. /// <param name="stRowkey"></param>
67. /// <param name="?"></param>
68. /// <remarks>结果集包含StartRowKey列值,不包含EndRowKey的列值</remarks>
69. public static List<TRowResult> GetDataFromHBaseThroughRowKeyRange(string tablename,
70. string stRowkey, string endRowkey,List<string> _cols)
71. {
72. byte[]> _bytes = new List<byte[]>();
73. foreach (string str in _cols)
74. _bytes.Add(Encoding.UTF8.GetBytes(str));
75.
76.
77. int ScannerID = client.scannerOpenWithStop(Encoding.UTF8.GetBytes(tablename),
78. Encoding.UTF8.GetBytes(stRowkey), Encoding.UTF8.GetBytes(endRowkey),
79. null);
80.
81. List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);
82. return reslut;
83. }
84.
85. /// <summary>
86. /// 通过Filter进行数据的Scanner
87. /// </summary>
88. /// <param name="tablename"></param>
89. /// <param name="filterString"></param>
90. public static List<TRowResult> GetDataFromHBaseThroughFilter(string tablename, string filterString, List<byte[]> _cols)
91. {
92. new TScan();
93. //SingleColumnValueFilter('i', 'Data', =, '2')
94. _scan.FilterString = Encoding.UTF8.GetBytes(filterString);
95. _scan.Columns = _cols;
96.
97. int ScannerID = client.scannerOpenWithScan(Encoding.UTF8.GetBytes(tablename), _scan, null);
98.
99. List<TRowResult> reslut = client.scannerGetList(ScannerID, 100);
100. return reslut;
101. }
102.
103. public static bool MutateRowHBase(string tablename, string rowkey, List<Mutation> _mutations)
104. {
105. try
106. {
107. null);
108. return true;
109. }
110. catch (Exception e)
111. {
112. return false;
113. }
114. }
115.
116. public static bool MutateRowsHBase(string tablename, List<BatchMutation> _BatchMutation)
117. {
118. try
119. {
120. null);
121. return true;
122. }
123. catch (Exception e)
124. {
125.
126. return false;
127. }
128.
129. }
130.
131. public static bool DeleteRowHBase(string tablename, string rowkey, string column)
132. {
133. try
134. {
135. client.deleteAll(Encoding.UTF8.GetBytes(tablename), Encoding.UTF8.GetBytes(rowkey),
136. null);
137. return true;
138. }
139. catch (Exception e)
140. {
141.
142. return false;
143. }
144.
145. }
146.
147. public static bool DeleteAllRow(string tablename, string rowkey)
148. {
149. try
150. {
151. null);
152. return true;
153. }
154. catch (Exception e)
155. {
156. return false;
157. }
158.
159. }
160.
161. public static bool DeleteTable(string tablename)
162. {
163. try
164. {
165. client.deleteTable(Encoding.UTF8.GetBytes(tablename));
166. return true;
167. }
168. catch (Exception e)
169. {
170. return false;
171. }
172.
173. }
174.
175. public static bool CreateTable(string tablename, List<ColumnDescriptor> _cols)
176. {
177. try
178. {
179. client.createTable(Encoding.UTF8.GetBytes(tablename), _cols);
180. return true;
181. }
182. catch (Exception e)
183. {
184. return false;
185. }
186.
187. }
188. }
189. }
好了,关于Thrift的基本操作就写到这,当然Thrift也支持Hbase中比较高级的操作,在以后的博客会不断更新。谢谢大家,个人水平有限,不足之处请谅解。
好博客就要一起分享哦!分享海报
此处可发布评论
评论(0)展开评论
展开评论