1
2
3
4
5
6
7
8
9
10
11 """ Various storage (molecular and otherwise) functionality
12
13 """
14 from rdkit import RDConfig
15 from rdkit.Dbase import DbModule
16
17
19 """ returns whether or not an RDId is valid
20
21 >>> ValidateRDId('RDCmpd-000-009-9')
22 1
23 >>> ValidateRDId('RDCmpd-009-000-009-8')
24 1
25 >>> ValidateRDId('RDCmpd-009-000-109-8')
26 0
27 >>> ValidateRDId('bogus')
28 0
29
30 """
31 ID = ID.replace('_', '-')
32 splitId = ID.split('-')
33 if len(splitId) < 4:
34 return 0
35 accum = 0
36 for entry in splitId[1:-1]:
37 for char in entry:
38 try:
39 v = int(char)
40 except ValueError:
41 return 0
42 accum += v
43 crc = int(splitId[-1])
44 return accum % 10 == crc
45
46
48 """ Returns the integer index for a given RDId
49 Throws a ValueError on error
50
51 >>> RDIdToInt('RDCmpd-000-009-9')
52 9
53 >>> RDIdToInt('RDCmpd-009-000-009-8')
54 9000009
55 >>> RDIdToInt('RDData_000_009_9')
56 9
57 >>> try:
58 ... RDIdToInt('RDCmpd-009-000-109-8')
59 ... except ValueError:
60 ... print('ok')
61 ... else:
62 ... print('failed')
63 ok
64 >>> try:
65 ... RDIdToInt('bogus')
66 ... except ValueError:
67 ... print('ok')
68 ... else:
69 ... print('failed')
70 ok
71
72 """
73 if validate and not ValidateRDId(ID):
74 raise ValueError("Bad RD Id")
75 ID = ID.replace('_', '-')
76 terms = ID.split('-')[1:-1]
77 res = 0
78 factor = 1
79 terms.reverse()
80 for term in terms:
81 res += factor * int(term)
82 factor *= 1000
83 return res
84
85
87 """ Converts an integer index into an RDId
88
89 The format of the ID is:
90 leadText-xxx-xxx-xxx-y
91 The number blocks are zero padded and the the final digit (y)
92 is a checksum:
93 >>> str(IndexToRDId(9))
94 'RDCmpd-000-009-9'
95 >>> str(IndexToRDId(9009))
96 'RDCmpd-009-009-8'
97
98 A millions block is included if it's nonzero:
99 >>> str(IndexToRDId(9000009))
100 'RDCmpd-009-000-009-8'
101
102 The text at the beginning can be altered:
103 >>> str(IndexToRDId(9,leadText='RDAlt'))
104 'RDAlt-000-009-9'
105
106 Negative indices are errors:
107 >>> try:
108 ... IndexToRDId(-1)
109 ... except ValueError:
110 ... print('ok')
111 ... else:
112 ... print('failed')
113 ok
114
115 """
116 if idx < 0:
117 raise ValueError('indices must be >= zero')
118
119 res = leadText + '-'
120 tmpIdx = idx
121 if idx >= 1e6:
122 res += '%03d-' % (idx // 1e6)
123 tmpIdx = idx % int(1e6)
124 if tmpIdx < 1000:
125 res += '000-'
126 else:
127 res += '%03d-' % (tmpIdx // 1000)
128 tmpIdx = tmpIdx % 1000
129
130 res += '%03d-' % (tmpIdx)
131 accum = 0
132 txt = str(idx)
133 for char in txt:
134 accum += int(char)
135
136 res += str(accum % 10)
137 return res
138
139
141 """ returns the next available Id in the database
142
143 see RegisterItem for testing/documentation
144
145 """
146 vals = conn.GetData(table=table, fields=idColName)
147 maxVal = 0
148 for val in vals:
149 val = RDIdToInt(val[0], validate=0)
150 if val > maxVal:
151 maxVal = val
152 maxVal += 1
153 return maxVal
154
155
156 -def GetNextRDId(conn, table, idColName='Id', leadText=''):
157 """ returns the next available RDId in the database
158
159 see RegisterItem for testing/documentation
160
161 """
162 if not leadText:
163 val = conn.GetData(table=table, fields=idColName)[0][0]
164 val = val.replace('_', '-')
165 leadText = val.split('-')[0]
166
167 ID = GetNextId(conn, table, idColName=idColName)
168 return IndexToRDId(ID, leadText=leadText)
169
170
171 -def RegisterItem(conn, table, value, columnName, data=None, id='', idColName='Id',
172 leadText='RDCmpd'):
173 """
174 >>> from rdkit.Dbase.DbConnection import DbConnect
175 >>> conn = DbConnect(tempDbName)
176 >>> tblName = 'StorageTest'
177 >>> conn.AddTable(tblName,'id varchar(32) not null primary key,label varchar(40),val int')
178 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(1, 'RDCmpd-000-001-1')
179 True
180 >>> RegisterItem(conn,tblName,'label2','label',['label2',1])==(1, 'RDCmpd-000-002-2')
181 True
182 >>> RegisterItem(conn,tblName,'label1','label',['label1',1])==(0, 'RDCmpd-000-001-1')
183 True
184 >>> str(GetNextRDId(conn,tblName))
185 'RDCmpd-000-003-3'
186 >>> tuple(conn.GetData(table=tblName)[0])==('RDCmpd-000-001-1', 'label1', 1)
187 True
188
189 It's also possible to provide ids by hand:
190 >>> RegisterItem(conn,tblName,'label10','label',['label10',1],
191 ... id='RDCmpd-000-010-1')==(1, 'RDCmpd-000-010-1')
192 True
193 >>> str(GetNextRDId(conn,tblName))
194 'RDCmpd-000-011-2'
195
196 """
197 curs = conn.GetCursor()
198 query = 'select %s from %s where %s=%s' % (idColName, table, columnName, DbModule.placeHolder)
199 curs.execute(query, (value, ))
200 tmp = curs.fetchone()
201 if tmp:
202 return 0, tmp[0]
203 ID = id or GetNextRDId(conn, table, idColName=idColName, leadText=leadText)
204 if data:
205 row = [ID]
206 row.extend(data)
207 conn.InsertData(table, row)
208 conn.Commit()
209 return 1, ID
210
211
212 -def RegisterItems(conn, table, values, columnName, rows, startId='', idColName='Id',
213 leadText='RDCmpd'):
214 """
215 """
216 if rows and len(rows) != len(values):
217 raise ValueError("length mismatch between rows and values")
218 nVals = len(values)
219 origOrder = {}
220 for i, v in enumerate(values):
221 origOrder[v] = i
222
223 curs = conn.GetCursor()
224 qs = ','.join(DbModule.placeHolder * nVals)
225 curs.execute("create temporary table regitemstemp (%(columnName)s)" % locals())
226 curs.executemany("insert into regitemstemp values (?)", [(x, ) for x in values])
227 query = ('select %(columnName)s,%(idColName)s from %(table)s ' +
228 'where %(columnName)s in (select * from regitemstemp)' % locals())
229 curs.execute(query)
230
231 dbData = curs.fetchall()
232 if dbData and len(dbData) == nVals:
233 return 0, [x[1] for x in dbData]
234
235 if not startId:
236 startId = GetNextRDId(conn, table, idColName=idColName, leadText=leadText)
237 startId = RDIdToInt(startId)
238 ids = [None] * nVals
239 for val, ID in dbData:
240 ids[origOrder[val]] = ID
241
242 rowsToInsert = []
243 for i in range(nVals):
244 if ids[i] is None:
245 ID = startId
246 startId += 1
247 ID = IndexToRDId(ID, leadText=leadText)
248 ids[i] = ID
249 if rows:
250 row = [ID]
251 row.extend(rows[i])
252 rowsToInsert.append(row)
253 if rowsToInsert:
254 nCols = len(rowsToInsert[0])
255 qs = ','.join(DbModule.placeHolder * nCols)
256 curs.executemany('insert into %(table)s values (%(qs)s)' % locals(), rowsToInsert)
257 conn.Commit()
258 return len(values) - len(dbData), ids
259
260
261
262
263
264
265 _roundtripTests = """
266 >>> ValidateRDId(IndexToRDId(100))
267 1
268 >>> ValidateRDId(IndexToRDId(10000,leadText='foo'))
269 1
270 >>> indices = [1,100,1000,1000000]
271 >>> vals = []
272 >>> for idx in indices:
273 ... vals.append(RDIdToInt(IndexToRDId(idx)))
274 >>> vals == indices
275 1
276
277 """
278 __test__ = {"roundtrip": _roundtripTests}
279
280
282 import doctest
283 import sys
284 return doctest.testmod(sys.modules["__main__"], verbose=True)
285
286
287 if __name__ == '__main__':
288 import sys
289 import tempfile
290 import shutil
291 import os
292 if RDConfig.useSqlLite:
293 tmpf, tempName = tempfile.mkstemp(suffix='sqlt')
294 tempDbName = tempName
295 shutil.copyfile(RDConfig.RDTestDatabase, tempDbName)
296 else:
297 tempDbName = '::RDTests'
298 failed, tried = _test()
299 if RDConfig.useSqlLite and os.path.exists(tempDbName):
300 try:
301 os.unlink(tempDbName)
302 except:
303 import traceback
304 traceback.print_exc()
305 sys.exit(failed)
306