DBSync.cc
Go to the documentation of this file.
1 //-------------------------------------------------------------------------------------------------------------------------------
2 // MYSQL Database Synchronization Utility
3 //
4 // Jared MacLeod jmacleod@lynx.neu.edu
5 // Tom Paul tom.paul@cern.ch
6 //
7 // This program will only work if both the Main Mysql Server and the local one have the same tables setup identically for the database.
8 //
9 // INPUT- Takes command line input. Must be in correct order. See Usage below. Works on one database at a time.
10 //
11 // Uses input information to connect to 2 mysql servers with the same database and syncs them using timestamps as reference.
12 // Note that all tables need a timestamp column in order to be synchronized.
13 //
14 //
15 //-------------------------------------------------------------------------------------------------------------------------------
16 
17 #include <mysql/mysql.h>
18 #include <string>
19 #include <vector>
20 #include <sstream>
21 #include <iostream>
22 #include <iterator>
23 
24 using namespace std;
25 
26 
27 void
28 DisplayUsage(const char* const pname)
29 {
30  cout << "Usage : " << pname << " "
31  "-d {database name(s)} -r {remote server to read from} "
32  "-U {username on read server} -P {pasword on read server} "
33  "-w {local server to write to} -u {username on write server} "
34  "-p {password on write server} [-o {port on read server} "
35  "-i {port on write server}]\n"
36  << endl;
37 
38  exit(EXIT_SUCCESS);
39 }
40 
41 
42 // we do not want unnecessary dependency, therefore this poor-man's implementation of boost::lexical_cast
43 
44 template<typename T, typename U>
45 T
46 LexicalCast(const U& x)
47 {
48  stringstream ss;
49  ss << x;
50  T y;
51  if (!(ss >> y))
52  throw;
53  return y;
54 }
55 
56 
57 int
58 main(int argc, char* argv[])
59 {
60  string databaseName;
61  string dbserverMain;
62  string usernameMain;
63  string passwordMain;
64  string dbserverLocal;
65  string usernameLocal;
66  string passwordLocal;
67  string localPortString = "3306";
68  string remotePortString = "3306";
69 
70  if (argc == 1)
71  DisplayUsage(argv[0]);
72 
73  for (int i = 1; i < argc && *argv[i] == '-'; ++i)
74  for (const char* p = &argv[i][1]; *p != '\0'; ++p)
75  switch (*p) {
76  case 'd':
77  if (argv[i+1])
78  databaseName = argv[++i];
79  break;
80  case 'r':
81  if (argv[i+1])
82  dbserverMain = argv[++i];
83  break;
84  case 'U':
85  if (argv[i+1])
86  usernameMain = argv[++i];
87  break;
88  case 'P':
89  if (argv[i+1])
90  passwordMain = argv[++i];
91  break;
92  case 'w':
93  if (argv[i+1])
94  dbserverLocal = argv[++i];
95  break;
96  case 'u':
97  if (argv[i+1])
98  usernameLocal = argv[++i];
99  break;
100  case 'p':
101  if (argv[i+1])
102  passwordLocal = argv[++i];
103  break;
104  case 'o':
105  if (argv[i+1])
106  remotePortString = argv[++i];
107  break;
108  case 'i':
109  if (argv[i+1])
110  localPortString = argv[++i];
111  break;
112  default:
113  break;
114  }
115 
116  if (databaseName.empty() ||
117  dbserverMain.empty() ||
118  usernameMain.empty() ||
119  passwordMain.empty() ||
120  dbserverLocal.empty() ||
121  usernameLocal.empty() ||
122  passwordLocal.empty()) {
123  cout << "Required argument is missing." << endl;
124  DisplayUsage(argv[0]);
125  }
126 
127  // Parse the databaseName string and convert it into multiple strings, each with the name of a single database
128  istringstream databaseNameStream(databaseName);
129  const vector<string> databaseNames{istream_iterator<string>(databaseNameStream),
130  istream_iterator<string>()};
131 
132  // Some checking should go here to ensure databases are identical before attempting an update
133  // This (probably) means checking the primary keys on the last entry in all tables in the database.
134  // Is that a sufficient check?
135 
136  cout << "\n"
137  "=========================================================================================="
138  << endl;
139 
140  const time_t curTime = time(nullptr);
141  cout << ctime(&curTime) << "\n"
142  "Synchronizing databases from server "
143  << dbserverMain << " "
144  "(port " << remotePortString << ") "
145  "to server " << dbserverLocal << " "
146  "(port " << localPortString << ")\n"
147  "The following database(s) will be affected:\n";
148 
149  for (const auto& db : databaseNames)
150  cout << db << '\n';
151  cout << flush;
152 
153  //--------------------------------------------------------------------------------------------
154  // This section attempts to connect to the databases, and quits if it cannot connect to either.
155  //--------------------------------------------------------------------------------------------
156 
157  MYSQL* sqlMain = nullptr;
158  MYSQL* sqlLocal = nullptr;
159 
160  for (const auto& db : databaseNames) {
161 
162  cout << "------------------- Updating database: " << db << " -----------------------" << endl;
163 
164  sqlMain = mysql_init(nullptr);
165  sqlLocal = mysql_init(nullptr);
166 
167  const int localPort = LexicalCast<int>(localPortString);
168  const int remotePort = LexicalCast<int>(remotePortString);
169 
170  if (!mysql_real_connect(sqlMain, dbserverMain.c_str(),
171  usernameMain.c_str(), passwordMain.c_str(),
172  db.c_str(), remotePort, 0, 0)) {
173  cerr << sqlMain << ' ' << dbserverMain.c_str() << ' '
174  << usernameMain.c_str() << ' '
175  << passwordMain.c_str() << ' '
176  << db.c_str() << ' '
177  << remotePort << "\n"
178  "Failed to connect to remote database: "
179  "Error: " << mysql_error(sqlMain) << endl;
180  mysql_close(sqlMain);
181  mysql_close(sqlLocal);
182  return 1;
183  } else if (!mysql_real_connect(sqlLocal, dbserverLocal.c_str(),
184  usernameLocal.c_str(), passwordLocal.c_str(),
185  db.c_str(), localPort, 0, 0)) {
186  cerr << "Failed to connect to local database: "
187  "Error: " << mysql_error(sqlLocal) << endl;
188  mysql_close(sqlMain);
189  mysql_close(sqlLocal);
190  return 2;
191  }
192 
193  //-----------------------------------------------------------------------
194  //This section gets a list of the tables in the selected database.
195  //-----------------------------------------------------------------------
196 
197  ostringstream query;
198  query << "SHOW TABLES";
199  if (mysql_query(sqlMain, query.str().c_str())) {
200 
201  cerr << "**Error: Could not get a list of tables from Main server: " << dbserverMain << "\n"
202  " Query failed : " << query.str().c_str() << endl;
203 
204  } else {
205 
206  MYSQL_ROW row;
207  MYSQL_ROW row2;
208  MYSQL_RES* databaseRes = nullptr;
209  databaseRes = mysql_store_result(sqlMain);
210  const int numTables = mysql_num_rows(databaseRes);
211 
212  if (!numTables)
213  cerr << "*Warning: database contains no tables!" << endl;
214 
215  for (int i = 0; i < numTables; ++i) {
216  //-------------------------------------------------
217  // Find the last primary key number in local table.
218  //-------------------------------------------------
219 
220  row = mysql_fetch_row(databaseRes);
221  const string tableName = row[0];
222  query.str("");
223 
224  query << "SELECT " << tableName << "_id FROM "
225  << tableName << " ORDER BY " << tableName << "_id DESC LIMIT 1";
226 
227  if (mysql_query(sqlLocal, query.str().c_str())) {
228 
229  cerr << "** Error trying to determine the last modified time for " << tableName << "\n"
230  " Query failed: " << query.str().c_str() << endl;
231 
232  } else {
233 
234  MYSQL_RES* const localPrimaryKeyRes = mysql_store_result(sqlLocal);
235  const int numLocalRecords = mysql_num_rows(localPrimaryKeyRes);
236  int lastLocalPrimaryKey = 0;
237  int lastPrimaryKey = 0;
238 
239  if (numLocalRecords > 0) {
240  const MYSQL_ROW localPrimaryKeyRow = mysql_fetch_row(localPrimaryKeyRes);
241  lastLocalPrimaryKey = LexicalCast<int>(localPrimaryKeyRow[0]);
242  cout << "Updating table: " << tableName << ". Last primary key for local table = " << lastLocalPrimaryKey << endl;
243  } else {
244  lastLocalPrimaryKey = 0;
245  cout << "No records in local table: " << tableName << ". "
246  "Attempting to fill entire local table." << endl;
247  }
248 
249  // First check that local database does not have primary keys with greater value than in remote database.
250  // If this is the case, it implies that local database was modified using some code other than DBSync
251 
252  query.str("");
253  query << "SELECT " << tableName << "_id FROM "
254  << tableName
255  // << " WHERE " << tableName
256  // << "_id <= " << lastLocalPrimaryKey << " "
257  << " ORDER BY " << tableName << "_id DESC LIMIT 1";
258 
259  if (!mysql_query(sqlMain, query.str().c_str())) {
260 
261  MYSQL_RES* const tableRes = mysql_store_result(sqlMain);
262  const int numRows = mysql_num_rows(tableRes);
263 
264  if (numRows > 0) {
265  MYSQL_ROW firstKeyRow = mysql_fetch_row(tableRes);
266  lastPrimaryKey = LexicalCast<int>(firstKeyRow[0]);
267  }
268 
269  if (numRows < numLocalRecords || lastPrimaryKey < lastLocalPrimaryKey) {
270  cout << "MAJOR BIG-TIME ERROR!\n"
271  "The number of records in the local table '" << tableName << "' "
272  "(" << numLocalRecords << ")\n and the number of records in the master table "
273  "(" << numRows << ") are not equal\n"
274  "or the last key in the local table (" << lastLocalPrimaryKey << ") "
275  "and the last key in the master table (" << lastPrimaryKey << ") "
276  "are not equal!\n"
277  "Has someone been writing to the local database?" << endl;
278  exit(1);
279  }
280  mysql_free_result(tableRes);
281  }
282 
283  const int limit = 1000;
284  int numWrites = 0;
285  ostringstream insertString;
286 
287  for (;;) {
288 
289  query.str("");
290  query << "SELECT * FROM " << tableName << " WHERE " << tableName << "_id > " << lastLocalPrimaryKey << " AND "
291  << tableName << "_id <= " << lastLocalPrimaryKey + limit << ';';
292 
293  int numRows = 0;
294 
295  if (mysql_query(sqlMain, query.str().c_str())) {
296 
297  cerr << "**Error Could not get records from Main database\n"
298  " Query failed: " << query.str().c_str() << endl;
299  mysql_close(sqlMain);
300  mysql_close(sqlLocal);
301  exit(3);
302 
303  } else {
304 
305  MYSQL_RES* const tableRes = mysql_store_result(sqlMain);
306  numRows = mysql_num_rows(tableRes);
307 
308  if (!numRows)
309 
310  cout << "No new records. Nothing is being written in the database." << endl;
311 
312  else {
313 
314  const int numFields = mysql_num_fields(tableRes);
315 
316  if (!numFields)
317  cerr << "*Warning: zero fields!" << endl;
318 
319  cout << "*********** Writing " << ++numWrites << ": " << numRows
320  << " new records in " << tableName
321  << " (limit per query = " << limit << ") " << endl;
322 
323  insertString.str("");
324  insertString << "INSERT INTO " << tableName << " VALUES ";
325 
326  for (int j = 0; j < numRows; ++j) {
327 
328  row2 = mysql_fetch_row(tableRes);
329  insertString << "( ";
330 
331  for (int k = 0; k < numFields; ++k) {
332 
333  if (row2[k])
334  insertString << '"' << row2[k] << '"';
335  else
336  insertString << "NULL";
337 
338  if (k < (numFields - 1))
339  insertString << ", ";
340 
341  }
342 
343  if (j < (numRows - 1))
344  insertString << " ), ";
345  else
346  insertString << " );";
347 
348  }
349 
350  if (mysql_query(sqlLocal, insertString.str().c_str()))
351  cerr << "**ERROR. Insert query to the database failed.\n"
352  "query : " << insertString.str() << "\n"
353  "reported error: " << mysql_error(sqlLocal) << endl;
354 
355  }
356 
357  mysql_free_result(tableRes);
358 
359  }
360 
361  if (lastPrimaryKey <= lastLocalPrimaryKey)
362  break;
363 
364  lastLocalPrimaryKey += limit;
365 
366  }
367  }
368  }
369  }
370 
371  mysql_close(sqlMain);
372  mysql_close(sqlLocal);
373 
374  }
375 }
int exit
Definition: dump1090.h:237
T LexicalCast(const U &x)
Definition: DBSync.cc:46
#define U
int main(int argc, char *argv[])
Definition: DBSync.cc:58
void DisplayUsage(const char *const pname)
Definition: DBSync.cc:28

, generated on Tue Sep 26 2023.